mirror of
https://github.com/doms9/iptv.git
synced 2026-03-07 11:18:25 +01:00
206 lines
5.5 KiB
Python
206 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
import asyncio
|
|
import gzip
|
|
import re
|
|
from pathlib import Path
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from scrapers.utils import get_logger, network
|
|
|
|
log = get_logger(__name__)
|
|
|
|
BASE_M3U8 = Path(__file__).parent / "base.m3u8"
|
|
|
|
EPG_FILE = Path(__file__).parent / "TV.xml"
|
|
|
|
LIVE_IMG = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png"
|
|
|
|
EPG_URLS = {
|
|
"https://epgshare01.online/epgshare01/epg_ripper_CA2.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_DUMMY_CHANNELS.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_FANDUEL1.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_PLEX1.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_UK1.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_US2.xml.gz",
|
|
"https://epgshare01.online/epgshare01/epg_ripper_US_LOCALS1.xml.gz",
|
|
"https://i.mjh.nz/Roku/all.xml.gz",
|
|
}
|
|
|
|
DUMMIES = {
|
|
"Basketball.Dummy.us": LIVE_IMG,
|
|
"Golf.Dummy.us": LIVE_IMG,
|
|
"Live.Event.us": LIVE_IMG,
|
|
"MLB.Baseball.Dummy.us": None,
|
|
"NBA.Basketball.Dummy.us": None,
|
|
"NFL.Dummy.us": None,
|
|
"NHL.Hockey.Dummy.us": None,
|
|
"PPV.EVENTS.Dummy.us": LIVE_IMG,
|
|
"Racing.Dummy.us": LIVE_IMG,
|
|
"Soccer.Dummy.us": LIVE_IMG,
|
|
"Tennis.Dummy.us": LIVE_IMG,
|
|
"WNBA.dummy.us": None,
|
|
}
|
|
|
|
REPLACE_IDs = {
|
|
"NCAA Sports": {"old": "Sports.Dummy.us", "new": "NCAA.Sports.Dummy.us"},
|
|
"UFC": {"old": "UFC.247.Dummy.us", "new": "UFC.Dummy.us"},
|
|
}
|
|
|
|
|
|
def get_tvg_ids() -> dict[str, str]:
|
|
tvg: dict[str, str] = {}
|
|
|
|
for line in BASE_M3U8.read_text(encoding="utf-8").splitlines():
|
|
if not line.startswith("#EXTINF"):
|
|
continue
|
|
|
|
tvg_id = re.search(r'tvg-id="([^"]*)"', line)
|
|
tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)
|
|
|
|
if tvg_id:
|
|
tvg[tvg_id[1]] = tvg_logo[1] if tvg_logo else None
|
|
|
|
tvg |= DUMMIES
|
|
|
|
tvg |= {v["old"]: LIVE_IMG for v in REPLACE_IDs.values()}
|
|
|
|
return tvg
|
|
|
|
|
|
async def fetch_xml(url: str) -> ET.Element | None:
|
|
if not (xml_data := await network.request(url, log=log)):
|
|
return
|
|
|
|
try:
|
|
log.info(f'Parsing XML from "{url}"')
|
|
|
|
data = gzip.decompress(xml_data.content)
|
|
|
|
return ET.fromstring(data)
|
|
except Exception as e:
|
|
log.error(f'Failed to parse from "{url}": {e}')
|
|
|
|
return
|
|
|
|
|
|
def hijack_id(
|
|
root: ET.Element,
|
|
*,
|
|
old: str,
|
|
new: str,
|
|
text: str,
|
|
) -> None:
|
|
|
|
og_channel = root.find(f"./channel[@id='{old}']")
|
|
|
|
if og_channel is not None:
|
|
new_channel = ET.Element(og_channel.tag, {**og_channel.attrib, "id": new})
|
|
|
|
display_name = og_channel.find("display-name")
|
|
|
|
if (display_name := og_channel.find("display-name")) is not None:
|
|
new_channel.append(ET.Element("display-name", display_name.attrib))
|
|
new_channel[-1].text = text
|
|
|
|
for child in og_channel:
|
|
if child.tag == "display-name":
|
|
continue
|
|
|
|
new_child = ET.Element(child.tag, child.attrib)
|
|
new_child.text = child.text
|
|
|
|
root.remove(og_channel)
|
|
|
|
root.append(new_channel)
|
|
|
|
for program in root.findall(f"./programme[@channel='{old}']"):
|
|
new_program = ET.Element(program.tag, {**program.attrib, "channel": new})
|
|
|
|
for child in program:
|
|
new_child = ET.Element(child.tag, child.attrib)
|
|
new_child.text = child.text
|
|
new_program.append(new_child)
|
|
|
|
for tag_name in ["title", "desc", "sub-title"]:
|
|
if (tag := new_program.find(tag_name)) is not None:
|
|
tag.text = text
|
|
|
|
root.remove(program)
|
|
|
|
root.append(new_program)
|
|
|
|
|
|
async def main() -> None:
|
|
log.info(f"{'=' * 10} Fetching EPG {'=' * 10}")
|
|
|
|
tvg_ids = get_tvg_ids()
|
|
|
|
parsed_tvg_ids: set[str] = set()
|
|
|
|
root = ET.Element("tv")
|
|
|
|
epgs = await asyncio.gather(*(fetch_xml(url) for url in EPG_URLS))
|
|
|
|
for epg_data in (epg for epg in epgs if epg is not None):
|
|
for channel in epg_data.findall("channel"):
|
|
if (channel_id := channel.get("id")) not in tvg_ids:
|
|
continue
|
|
|
|
parsed_tvg_ids.add(channel_id)
|
|
|
|
for icon_tag in channel.findall("icon"):
|
|
if logo := tvg_ids.get(channel_id):
|
|
icon_tag.set("src", logo)
|
|
|
|
if (url_tag := channel.find("url")) is not None:
|
|
channel.remove(url_tag)
|
|
|
|
root.append(channel)
|
|
|
|
for program in epg_data.findall("programme"):
|
|
if program.get("channel") not in tvg_ids:
|
|
continue
|
|
|
|
title_text = program.find("title").text
|
|
|
|
subtitle = program.find("sub-title")
|
|
|
|
if (
|
|
title_text in ["NHL Hockey", "Live: NFL Football"]
|
|
and subtitle is not None
|
|
):
|
|
program.find("title").text = f"{title_text} {subtitle.text}"
|
|
|
|
root.append(program)
|
|
|
|
for title, ids in REPLACE_IDs.items():
|
|
hijack_id(root, **ids, text=title)
|
|
|
|
if missing_ids := set(tvg_ids) - parsed_tvg_ids:
|
|
log.warning(f"Missed {len(missing_ids)} TVG ID(s)")
|
|
|
|
for channel_id in missing_ids:
|
|
log.warning(f"Missing: {channel_id}")
|
|
|
|
tree = ET.ElementTree(root)
|
|
|
|
tree.write(
|
|
EPG_FILE,
|
|
encoding="utf-8",
|
|
xml_declaration=True,
|
|
)
|
|
|
|
log.info(f"EPG saved to {EPG_FILE.resolve()}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
|
|
for hndlr in log.handlers:
|
|
hndlr.flush()
|
|
hndlr.stream.write("\n")
|
|
|
|
try:
|
|
asyncio.run(network.client.aclose())
|
|
except Exception:
|
|
pass
|