import asyncio import io import json import ssl import xml.etree.ElementTree as ET from datetime import datetime, timedelta from pathlib import Path import httpx from playwright.async_api import Request, async_playwright from .utils import LOGOS, TZ, get_logger, now, safe_process_event log = get_logger(__name__) urls: dict[str, dict[str, str]] = {} BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml" CERT_BUNDL_URLS = [ "https://curl.se/ca/cacert.pem", "https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem", "https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem", "https://ssl.com/repo/certs/Sectigo-AAA-Root.pem", ] CERT_FILE = Path(__file__).parent / "utils" / "cached-ca.pem" CACHE_FILE = Path(__file__).parent / "caches" / "livetvsx.json" exist_sprts = set(LOGOS.keys()) async def write_to_cert(client: httpx.AsyncClient, url: str, cert: Path) -> None: try: r = await client.get(url) r.raise_for_status() except Exception: log.error(f"Failed to write fetch: {url} returned {r.status_code}") with cert.open("a", encoding="utf-8") as f: f.write(f"{r.text}\n") async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext: CERT_FILE.unlink(missing_ok=True) tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDL_URLS] await asyncio.gather(*tasks) async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext: if CERT_FILE.is_file(): mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime) if now - mtime < timedelta(days=30): return ssl.create_default_context(cafile=CERT_FILE) log.info("Refreshing cached certificate") await refresh_cert_cache(client) return ssl.create_default_context(cafile=CERT_FILE) def load_cache() -> dict[str, dict[str, str | str]]: try: data: dict = json.loads(CACHE_FILE.read_text(encoding="utf-8")) age: float = now.timestamp() - data.get("timestamp", 0) return {k: v for k, v in data.items() if age < 14400} # 4 hours except (FileNotFoundError, json.JSONDecodeError): return {} async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO: buffer = io.BytesIO() try: async with httpx.AsyncClient(timeout=10, verify=ssl_ctx) as client: async with client.stream("GET", url) as r: r.raise_for_status() async for chunk in r.aiter_bytes(8192): buffer.write(chunk) buffer.seek(0) return buffer except Exception as e: log.error(f"Failed to fetch {url}: {e}") return io.BytesIO(b"") async def process_event(url: str, url_num: int) -> str | None: async with async_playwright() as p: browser = await p.firefox.launch(headless=True) context = await browser.new_context( ignore_https_errors=True # website doesn't send valid certs ) ev_page = await context.new_page() captured: list[str] = [] got_one = asyncio.Event() def capture_req(req: Request) -> None: if ( ".m3u8" in req.url and "amazonaws" not in req.url and "knitcdn" not in req.url ): captured.append(req.url) got_one.set() popup = None try: await ev_page.goto( url, wait_until="domcontentloaded", timeout=10_000, ) btn = await ev_page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)") if btn: try: await btn.click() await ev_page.wait_for_timeout(500) except Exception as e: log.debug(f"URL {url_num}) Failed to click Browser Links tab: {e}") return else: log.warning(f"URL {url_num}) Browser Links tab not found") link_img = await ev_page.query_selector( "tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img" ) if not link_img: log.warning(f"URL {url_num}) No browser link to click.") return ev_page.on("request", capture_req) try: async with ev_page.expect_popup(timeout=5_000) as popup_info: try: await link_img.click() except Exception as e: log.debug( f"URL {url_num}) Click failed (popup might have already been opened): {e}" ) popup = await popup_info.value popup.on("request", capture_req) except Exception: try: await link_img.click() except Exception as e: log.debug(f"URL {url_num}) Fallback click failed: {e}") return wait_task = asyncio.create_task(got_one.wait()) try: await asyncio.wait_for(wait_task, timeout=1.5e1) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out waiting for m3u8.") return finally: if not wait_task.done(): wait_task.cancel() try: await wait_task except asyncio.CancelledError: pass ev_page.remove_listener("request", capture_req) if popup: popup.remove_listener("request", capture_req) await popup.close() await ev_page.close() if captured: log.info(f"URL {url_num}) Captured M3U8") return captured[-1] log.warning(f"URL {url_num}) No m3u8 captured in popup or inline playback.") return except Exception: try: ev_page.remove_listener("request", capture_req) if popup: popup.remove_listener("request", capture_req) await popup.close() await ev_page.close() except Exception: pass await browser.close() async def get_events( url: str, ssl_ctx: ssl.SSLContext, cached_keys: set[str], ) -> list[dict[str, str]]: events: list[dict[str, str]] = [] pub_date_format = "%a, %d %b %Y %H:%M:%S %z" window_start, window_end = now - timedelta(hours=3), now + timedelta(hours=1) buffer = await fetch_xml_stream(url, ssl_ctx) for _, elem in ET.iterparse(buffer, events=("end",)): if elem.tag == "item": title = elem.findtext("title") desc = elem.findtext("description") pub_date = elem.findtext("pubDate") link = elem.findtext("link") try: dt = datetime.strptime(pub_date, pub_date_format) dt = dt.astimezone(TZ) except Exception: elem.clear() continue if window_start <= dt <= window_end: sport, event = ( ( desc.split(".")[0].strip(), " ".join(p.strip() for p in desc.split(".")[1:]), ) if desc else ("", "") ) key = f"[{sport}: {event}] {title}" if key in cached_keys: elem.clear() continue if exist_sprts & {sport, event}: continue events.append( { "sport": sport, "event": event, "title": title, "link": link, } ) elem.clear() return events async def main(client: httpx.AsyncClient) -> None: log.info(f'Scraping from "{BASE_URL}"') cert = await get_cert(client) cached_urls = load_cache() cached_count = len(cached_urls) events = await get_events(BASE_URL, cert, set(cached_urls.keys())) log.info(f"Processing {len(events)} URLs") for i, ev in enumerate(events, start=1): sport = ev["sport"] event = ev["event"] title = ev["title"] link = ev["link"] key = f"[{sport}: {event}] {title}" url = await safe_process_event( lambda: process_event(link, url_num=i), url_num=i, log=log, ) if url: entry = { "url": url, "logo": LOGOS.get( sport, "https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png", ), "timestamp": now.timestamp(), } urls[key] = cached_urls[key] = entry CACHE_FILE.write_text(json.dumps(cached_urls, indent=2), encoding="utf-8") log.info(f"Collected {len(cached_urls) - cached_count} event(s)")