iptv/EPG/fetch.py
2025-10-30 15:38:34 -04:00

193 lines
5.3 KiB
Python

#!/usr/bin/env python3
import asyncio
import gzip
import re
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
epg_file = Path(__file__).parent / "TV.xml"
epg_urls = [
"https://epgshare01.online/epgshare01/epg_ripper_CA2.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_DUMMY_CHANNELS.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_FANDUEL1.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_MY1.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_PLEX1.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_US2.xml.gz",
"https://epgshare01.online/epgshare01/epg_ripper_US_LOCALS1.xml.gz",
"https://i.mjh.nz/Roku/all.xml.gz",
]
client = httpx.AsyncClient(
timeout=5,
follow_redirects=True,
http2=True,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
},
)
live_img = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png"
dummies = {
"Basketball.Dummy.us": live_img,
"Golf.Dummy.us": live_img,
"Live.Event.us": live_img,
"MLB.Baseball.Dummy.us": None,
"NBA.Basketball.Dummy.us": None,
"NFL.Dummy.us": None,
"NHL.Hockey.Dummy.us": None,
"PPV.EVENTS.Dummy.us": live_img,
"Racing.Dummy.us": live_img,
"Soccer.Dummy.us": live_img,
"Tennis.Dummy.us": live_img,
"WNBA.dummy.us": None,
}
replace_ids = {
"NCAA Sports": {"old": "Sports.Dummy.us", "new": "NCAA.Sports.Dummy.us"},
"UFC": {"old": "UFC.247.Dummy.us", "new": "UFC.Dummy.us"},
}
def get_tvg_ids() -> dict[str, str]:
base_m3u8 = (
(Path(__file__).parent.parent / "M3U8" / "base.m3u8")
.read_text(encoding="utf-8")
.splitlines()
)
tvg = {}
for line in base_m3u8:
if line.startswith("#EXTINF"):
tvg_id = re.search(r'tvg-id="([^"]*)"', line)[1]
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)[1]
tvg[tvg_id] = tvg_logo
return tvg
async def fetch_xml(url: str) -> ET.Element | None:
try:
r = await client.get(url)
r.raise_for_status()
except Exception as e:
print(f'Failed to fetch "{url}": {e}')
return
try:
decompressed_data = gzip.decompress(r.content)
return ET.fromstring(decompressed_data)
except Exception as e:
print(f'Failed to decompress and parse XML from "{url}": {e}')
def hijack_id(
old: str,
new: str,
text: str,
root: ET.Element,
) -> None:
og_channel = root.find(f"./channel[@id='{old}']")
if og_channel is not None:
new_channel = ET.Element(og_channel.tag, {**og_channel.attrib, "id": new})
display_name = og_channel.find("display-name")
if display_name is not None:
new_channel.append(ET.Element("display-name", display_name.attrib))
new_channel[-1].text = text
for child in og_channel:
if child.tag == "display-name":
continue
new_child = ET.Element(child.tag, child.attrib)
new_child.text = child.text
root.remove(og_channel)
root.append(new_channel)
for program in root.findall(f"./programme[@channel='{old}']"):
new_program = ET.Element(program.tag, {**program.attrib, "channel": new})
for child in program:
new_child = ET.Element(child.tag, child.attrib)
new_child.text = child.text
new_program.append(new_child)
for tag_name in ["title", "desc", "sub-title"]:
tag = new_program.find(tag_name)
if tag is not None:
tag.text = text
root.remove(program)
root.append(new_program)
async def main() -> None:
tvg_ids = get_tvg_ids()
tvg_ids |= dummies | {v["old"]: live_img for v in replace_ids.values()}
root = ET.Element("tv")
tasks = [fetch_xml(url) for url in epg_urls]
results = await asyncio.gather(*tasks)
for epg_data in results:
if epg_data is None:
continue
for channel in epg_data.findall("channel"):
if (channel_id := channel.get("id")) in tvg_ids:
for icon_tag in channel.findall("icon"):
if logo := tvg_ids.get(channel_id):
icon_tag.set("src", logo)
if (url_tag := channel.find("url")) is not None:
channel.remove(url_tag)
root.append(channel)
for program in epg_data.findall("programme"):
if program.get("channel") in tvg_ids:
title_text = program.find("title").text
subtitle = program.find("sub-title")
if (
title_text in ["NHL Hockey", "Live: NFL Football"]
and subtitle is not None
):
program.find("title").text = f"{title_text} {subtitle.text}"
root.append(program)
for k, v in replace_ids.items():
hijack_id(**v, text=k, root=root)
tree = ET.ElementTree(root)
tree.write(epg_file, encoding="utf-8", xml_declaration=True)
print(f"EPG saved to {epg_file.resolve()}")
if __name__ == "__main__":
asyncio.run(main())
try:
asyncio.run(client.aclose())
except Exception:
pass