#!/usr/bin/env python3 import asyncio import io import json import ssl import xml.etree.ElementTree as ET from datetime import datetime, timedelta from pathlib import Path from typing import Any import httpx from playwright.async_api import Request, async_playwright from .logger import get_logger from .tvpass import TZ, logos log = get_logger(__name__) urls: dict[str, str] = {} tvp_sports = set(logos.keys()) BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml" CERT_BUNDL_URLS = [ "https://curl.se/ca/cacert.pem", "https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem", "https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem", "https://ssl.com/repo/certs/Sectigo-AAA-Root.pem", ] CERT_FILE = Path(__file__).parent / "cached-ca.pem" CACHE_FILE = Path(__file__).parent / "livetvsx.json" async def safe_process_event(fn, url_num: int, timeout=20) -> Any | None: try: return await asyncio.wait_for(fn(), timeout=timeout) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event") return async def write_to_cert(client: httpx.AsyncClient, url: str, cert: Path) -> None: try: r = await client.get(url) r.raise_for_status() except Exception: log.error(f"Failed to write fetch: {url} returned {r.status_code}") with cert.open("a", encoding="utf-8") as f: f.write(f"{r.text}\n") async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext: CERT_FILE.unlink(missing_ok=True) tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDL_URLS] await asyncio.gather(*tasks) async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext: if CERT_FILE.is_file(): mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime) if datetime.now() - mtime < timedelta(days=30): return ssl.create_default_context(cafile=CERT_FILE) log.info("Refreshing cached certificate") await refresh_cert_cache(client) return ssl.create_default_context(cafile=CERT_FILE) def load_cache() -> dict[str, dict[str, str | str]]: try: data = json.loads(CACHE_FILE.read_text(encoding="utf-8")) now = datetime.now().timestamp() return { k: v for k, v in data.items() if now - v.get("timestamp", 0) < timedelta(hours=4).total_seconds() } except (FileNotFoundError, json.JSONDecodeError): return {} async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO: buffer = io.BytesIO() try: async with httpx.AsyncClient(timeout=10, verify=ssl_ctx) as client: async with client.stream("GET", url) as r: r.raise_for_status() async for chunk in r.aiter_bytes(8192): buffer.write(chunk) buffer.seek(0) return buffer except Exception as e: log.error(f"Failed to fetch {url}: {e}") return io.BytesIO(b"") async def parse_feed( url: str, ssl_ctx: ssl.SSLContext, cached_keys: set[str], ) -> list[dict[str, str]]: events: list[dict[str, str]] = [] pub_date_format = "%a, %d %b %Y %H:%M:%S %z" now = datetime.now(TZ) window_start, window_end = now - timedelta(hours=3), now + timedelta(hours=1) buffer = await fetch_xml_stream(url, ssl_ctx) for _, elem in ET.iterparse(buffer, events=("end",)): if elem.tag == "item": title = elem.findtext("title") desc = elem.findtext("description") pub_date = elem.findtext("pubDate") link = elem.findtext("link") try: dt = datetime.strptime(pub_date, pub_date_format) dt = dt.astimezone(TZ) except Exception: elem.clear() continue if window_start <= dt <= window_end: sport, event = ( ( desc.split(".")[0].strip(), " ".join(p.strip() for p in desc.split(".")[1:]), ) if desc else ("", "") ) key = f"[{sport}: {event}] {title}" if key in cached_keys: elem.clear() continue elif not tvp_sports & {sport, event}: events.append( { "sport": sport, "event": event, "title": title, "link": link, } ) elem.clear() return events async def process_event(url: str, url_num: int, max_wait_ms=15_000) -> str | None: async with async_playwright() as p: browser = await p.firefox.launch(headless=True) context = await browser.new_context( ignore_https_errors=True # website doesn't send valid certs ) ev_page = await context.new_page() captured: list[str] = [] got_one = asyncio.Event() def capture_req(req: Request) -> None: if ( ".m3u8" in req.url and "amazonaws" not in req.url and "knitcdn" not in req.url and not captured ): captured.append(req.url) got_one.set() popup = None try: await ev_page.goto( url, wait_until="domcontentloaded", timeout=30_000, ) btn = await ev_page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)") if btn: try: await btn.click() await ev_page.wait_for_timeout(500) except Exception as e: log.debug(f"URL {url_num}) Failed to click Browser Links tab: {e}") return else: log.warning(f"URL {url_num}) Browser Links tab not found") link_img = await ev_page.query_selector( "tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img" ) if not link_img: log.warning(f"URL {url_num}) No browser link to click.") return ev_page.on("request", capture_req) try: async with ev_page.expect_popup(timeout=5_000) as popup_info: try: await link_img.click() except Exception as e: log.debug( f"URL {url_num}) Click failed (popup might have already been opened): {e}" ) popup = await popup_info.value popup.on("request", capture_req) except Exception: try: await link_img.click() except Exception as e: log.debug(f"URL {url_num}) Fallback click failed: {e}") return wait_task = asyncio.create_task(got_one.wait()) try: await asyncio.wait_for(wait_task, timeout=max_wait_ms / 1000) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out waiting for m3u8.") return finally: if not wait_task.done(): wait_task.cancel() try: await wait_task except asyncio.CancelledError: pass ev_page.remove_listener("request", capture_req) if popup: popup.remove_listener("request", capture_req) await popup.close() await ev_page.close() if captured: log.info(f"URL {url_num}) Captured M3U8") return captured[-1] log.warning(f"URL {url_num}) No m3u8 captured in popup or inline playback.") return except Exception as e: try: ev_page.remove_listener("request", capture_req) if popup: popup.remove_listener("request", capture_req) await popup.close() await ev_page.close() except Exception: pass await browser.close() async def main(client: httpx.AsyncClient) -> None: log.info(f'Scraping from "{BASE_URL}"') cert = await get_cert(client) cached_urls = load_cache() cached_keys = set(cached_urls.keys()) cached_count = len(cached_urls) events = await parse_feed(BASE_URL, cert, cached_keys) log.info(f"Processing {len(events)} URLs") now_ts = datetime.now().timestamp() for num, ev in enumerate(events, start=1): sport = ev["sport"] event = ev["event"] title = ev["title"] link = ev["link"] key = f"[{sport}: {event}] {title}" url = await safe_process_event( lambda: process_event(link, url_num=num), url_num=num ) if url: entry = { "url": url, "logo": logos.get( sport, "https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png", ), "timestamp": now_ts, } urls[key] = cached_urls[key] = entry CACHE_FILE.write_text(json.dumps(cached_urls, indent=2), encoding="utf-8") new_count = len(cached_urls) - cached_count log.info(f"Cached {cached_count} event(s)") log.info(f"Collected {new_count} new event(s)")