From 00000d947f8ed7c9ec83652adc6f490cadbb43e8 Mon Sep 17 00:00:00 2001 From: doms9 <96013514+doms9@users.noreply.github.com> Date: Tue, 27 Jan 2026 19:21:28 -0500 Subject: [PATCH] e add more logging to epg-fetch.py and health.sh --- M3U8/epg-fetch.py | 112 +++++++++++++++++++++++++++------------------- health.sh | 4 +- 2 files changed, 68 insertions(+), 48 deletions(-) diff --git a/M3U8/epg-fetch.py b/M3U8/epg-fetch.py index 5890b3d..8fe728a 100644 --- a/M3U8/epg-fetch.py +++ b/M3U8/epg-fetch.py @@ -13,7 +13,9 @@ BASE_M3U8 = Path(__file__).parent / "base.m3u8" EPG_FILE = Path(__file__).parent / "TV.xml" -EPG_URLS = [ +LIVE_IMG = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png" + +EPG_URLS = { "https://epgshare01.online/epgshare01/epg_ripper_CA2.xml.gz", "https://epgshare01.online/epgshare01/epg_ripper_DUMMY_CHANNELS.xml.gz", "https://epgshare01.online/epgshare01/epg_ripper_FANDUEL1.xml.gz", @@ -22,9 +24,7 @@ EPG_URLS = [ "https://epgshare01.online/epgshare01/epg_ripper_US2.xml.gz", "https://epgshare01.online/epgshare01/epg_ripper_US_LOCALS1.xml.gz", "https://i.mjh.nz/Roku/all.xml.gz", -] - -LIVE_IMG = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png" +} DUMMIES = { "Basketball.Dummy.us": LIVE_IMG, @@ -48,39 +48,47 @@ REPLACE_IDs = { def get_tvg_ids() -> dict[str, str]: - base_m3u8 = BASE_M3U8.read_text(encoding="utf-8").splitlines() + tvg: dict[str, str] = {} - tvg = {} + for line in BASE_M3U8.read_text(encoding="utf-8").splitlines(): + if not line.startswith("#EXTINF"): + continue - for line in base_m3u8: - if line.startswith("#EXTINF"): - tvg_id = re.search(r'tvg-id="([^"]*)"', line)[1] - tvg_logo = re.search(r'tvg-logo="([^"]*)"', line)[1] + tvg_id = re.search(r'tvg-id="([^"]*)"', line) + tvg_logo = re.search(r'tvg-logo="([^"]*)"', line) - tvg[tvg_id] = tvg_logo + if tvg_id: + tvg[tvg_id[1]] = tvg_logo[1] if tvg_logo else None + + tvg |= DUMMIES + + tvg |= {v["old"]: LIVE_IMG for v in REPLACE_IDs.values()} return tvg async def fetch_xml(url: str) -> ET.Element | None: - if not (html_data := await network.request(url, log=log)): + if not (xml_data := await network.request(url, log=log)): return try: - decompressed_data = gzip.decompress(html_data.content) + log.info(f'Parsing XML from "{url}"') - return ET.fromstring(decompressed_data) + data = gzip.decompress(xml_data.content) + + return ET.fromstring(data) except Exception as e: - log.error(f'Failed to decompress and parse XML from "{url}": {e}') + log.error(f'Failed to parse from "{url}": {e}') return def hijack_id( + root: ET.Element, + *, old: str, new: str, text: str, - root: ET.Element, ) -> None: og_channel = root.find(f"./channel[@id='{old}']") @@ -90,7 +98,7 @@ def hijack_id( display_name = og_channel.find("display-name") - if display_name is not None: + if (display_name := og_channel.find("display-name")) is not None: new_channel.append(ET.Element("display-name", display_name.attrib)) new_channel[-1].text = text @@ -114,9 +122,7 @@ def hijack_id( new_program.append(new_child) for tag_name in ["title", "desc", "sub-title"]: - tag = new_program.find(tag_name) - - if tag is not None: + if (tag := new_program.find(tag_name)) is not None: tag.text = text root.remove(program) @@ -129,48 +135,60 @@ async def main() -> None: tvg_ids = get_tvg_ids() - tvg_ids |= DUMMIES | {v["old"]: LIVE_IMG for v in REPLACE_IDs.values()} + parsed_tvg_ids: set[str] = set() root = ET.Element("tv") - tasks = [fetch_xml(url) for url in EPG_URLS] - - results = await asyncio.gather(*tasks) - - for epg_data in results: - if epg_data is None: - continue + epgs = await asyncio.gather(*(fetch_xml(url) for url in EPG_URLS)) + for epg_data in (epg for epg in epgs if epg is not None): for channel in epg_data.findall("channel"): - if (channel_id := channel.get("id")) in tvg_ids: - for icon_tag in channel.findall("icon"): - if logo := tvg_ids.get(channel_id): - icon_tag.set("src", logo) + if (channel_id := channel.get("id")) not in tvg_ids: + continue - if (url_tag := channel.find("url")) is not None: - channel.remove(url_tag) + parsed_tvg_ids.add(channel_id) - root.append(channel) + for icon_tag in channel.findall("icon"): + if logo := tvg_ids.get(channel_id): + icon_tag.set("src", logo) + + if (url_tag := channel.find("url")) is not None: + channel.remove(url_tag) + + root.append(channel) for program in epg_data.findall("programme"): - if program.get("channel") in tvg_ids: - title_text = program.find("title").text - subtitle = program.find("sub-title") + if program.get("channel") not in tvg_ids: + continue - if ( - title_text in ["NHL Hockey", "Live: NFL Football"] - and subtitle is not None - ): - program.find("title").text = f"{title_text} {subtitle.text}" + title_text = program.find("title").text - root.append(program) + subtitle = program.find("sub-title") - for k, v in REPLACE_IDs.items(): - hijack_id(**v, text=k, root=root) + if ( + title_text in ["NHL Hockey", "Live: NFL Football"] + and subtitle is not None + ): + program.find("title").text = f"{title_text} {subtitle.text}" + + root.append(program) + + for title, ids in REPLACE_IDs.items(): + hijack_id(root, **ids, text=title) + + if missing_ids := set(tvg_ids) - parsed_tvg_ids: + log.warning(f"Missed {len(missing_ids)} TVG ID(s)") + + for channel_id in missing_ids: + log.warning(f"Missing: {channel_id}") tree = ET.ElementTree(root) - tree.write(EPG_FILE, encoding="utf-8", xml_declaration=True) + tree.write( + EPG_FILE, + encoding="utf-8", + xml_declaration=True, + ) log.info(f"EPG saved to {EPG_FILE.resolve()}") diff --git a/health.sh b/health.sh index 2361aa5..f02c884 100644 --- a/health.sh +++ b/health.sh @@ -14,6 +14,8 @@ get_status() { [[ "$url" != http* ]] && return for attempt in $(seq 1 "$RETRY_COUNT"); do + echo "Checking '$url'" + response=$( curl -skL \ -A "$UA" \ @@ -22,7 +24,7 @@ get_status() { -H "Accept-Encoding: gzip, deflate, br" \ -H "Connection: keep-alive" \ -o /dev/null \ - --max-time 15 \ + --max-time 30 \ -w "%{http_code}" \ "$url" 2>&1 )