import asyncio import io import ssl import xml.etree.ElementTree as ET from datetime import datetime, timedelta from functools import partial from pathlib import Path import httpx from playwright.async_api import async_playwright from .utils import ( TZ, capture_req, get_logger, league_info, load_cache, new_browser, now, safe_process_event, write_cache, ) log = get_logger(__name__) urls: dict[str, dict[str, str | float]] = {} BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml" CERT_BUNDLE_URLS = [ "https://curl.se/ca/cacert.pem", "https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem", "https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem", "https://ssl.com/repo/certs/Sectigo-AAA-Root.pem", ] CERT_FILE = Path(__file__).parent / "caches" / "cached-cert.pem" CACHE_FILE = Path(__file__).parent / "caches" / "livetvsx.json" async def write_to_cert( client: httpx.AsyncClient, url: str, cert: Path, ) -> None: try: r = await client.get(url) r.raise_for_status() except Exception: log.error(f"Failed to write fetch: {url} returned {r.status_code}") with cert.open("a", encoding="utf-8") as f: f.write(f"{r.text}\n") async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext: CERT_FILE.unlink(missing_ok=True) tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDLE_URLS] await asyncio.gather(*tasks) async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext: if CERT_FILE.is_file(): mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime, TZ) if now - mtime < timedelta(days=30): return ssl.create_default_context(cafile=CERT_FILE) log.info("Refreshing cached certificate") await refresh_cert_cache(client) return ssl.create_default_context(cafile=CERT_FILE) async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO | None: buffer = io.BytesIO() try: async with httpx.AsyncClient( timeout=10, verify=ssl_ctx, follow_redirects=True, ) as client: async with client.stream("GET", url) as r: r.raise_for_status() async for chunk in r.aiter_bytes(8192): buffer.write(chunk) buffer.seek(0) return buffer except Exception as e: log.error(f"Failed to fetch {url}: {e}") return async def process_event(url: str, url_num: int) -> str | None: async with async_playwright() as p: browser, context = await new_browser(p, ignore_https_errors=True) page = await context.new_page() captured: list[str] = [] got_one = asyncio.Event() handler = partial(capture_req, captured=captured, got_one=got_one) popup = None try: await page.goto( url, wait_until="domcontentloaded", timeout=10_000, ) btn = await page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)") if btn: try: await btn.click() await page.wait_for_timeout(500) except Exception as e: log.debug(f"URL {url_num}) Failed to click Browser Links tab: {e}") return else: log.warning(f"URL {url_num}) Browser Links tab not found") return link_img = await page.query_selector( "tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img" ) if not link_img: log.warning(f"URL {url_num}) No browser link to click.") return page.on("request", handler) try: async with page.expect_popup(timeout=5_000) as popup_info: try: await link_img.click() except Exception as e: log.debug( f"URL {url_num}) Click failed (popup might have already been opened): {e}" ) popup = await popup_info.value popup.on("request", handler) except Exception: try: await link_img.click() except Exception as e: log.debug(f"URL {url_num}) Fallback click failed: {e}") wait_task = asyncio.create_task(got_one.wait()) try: await asyncio.wait_for(wait_task, timeout=15) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out waiting for M3U8.") return finally: if not wait_task.done(): wait_task.cancel() try: await wait_task except asyncio.CancelledError: pass page.remove_listener("request", handler) if popup: popup.remove_listener("request", handler) await popup.close() await page.close() if captured: log.info(f"URL {url_num}) Captured M3U8") return captured[-1] log.warning(f"URL {url_num}) No M3U8 captured in popup or inline playback.") return except Exception: try: page.remove_listener("request", handler) if popup: popup.remove_listener("request", handler) await popup.close() await page.close() except Exception: pass await browser.close() async def get_events( url: str, ssl_ctx: ssl.SSLContext, cached_keys: set[str], ) -> list[dict[str, str]]: events: list[dict[str, str]] = [] start_dt = now - timedelta(minutes=30) end_dt = now + timedelta(minutes=30) if buffer := await fetch_xml_stream(url, ssl_ctx): pub_date_format = "%a, %d %b %Y %H:%M:%S %z" for _, elem in ET.iterparse(buffer, events=("end",)): if elem.tag == "item": title = elem.findtext("title") desc = elem.findtext("description") pub_date = elem.findtext("pubDate") link = elem.findtext("link") try: dt = datetime.strptime(pub_date, pub_date_format) dt = dt.astimezone(TZ) except Exception: elem.clear() continue if not start_dt <= dt <= end_dt: elem.clear() continue sport, event = ( ( desc.split(".")[0].strip(), " ".join(p.strip() for p in desc.split(".")[1:]), ) if desc else ("", "") ) key = f"[{sport}: {event}] {title} (LTVSX)" if cached_keys & {key}: elem.clear() continue events.append( { "sport": sport, "event": event, "title": title, "link": link, } ) elem.clear() return events async def scrape(client: httpx.AsyncClient) -> None: cached_urls = load_cache(CACHE_FILE, exp=10_800) cached_count = len(cached_urls) urls.update(cached_urls) log.info(f"Collected {cached_count} event(s) from cache") log.info(f'Scraping from "{BASE_URL}"') cert = await get_cert(client) events = await get_events( BASE_URL, cert, set(cached_urls.keys()), ) log.info(f"Processing {len(events)} new URL(s)") for i, ev in enumerate(events, start=1): link = ev["link"] url = await safe_process_event( lambda: process_event(link, url_num=i), url_num=i, log=log, ) if url: sport = ev["sport"] event = ev["event"] title = ev["title"] key = f"[{sport}: {event}] {title} (LTVSX)" tvg_id, logo = league_info(sport) if not tvg_id: tvg_id, logo = league_info(event) entry = { "url": url, "logo": logo, "id": tvg_id or "Live.Event.us", "base": "https://livetv.sx/enx/", "timestamp": now.timestamp(), } urls[key] = cached_urls[key] = entry if new_count := len(cached_urls) - cached_count: log.info(f"Collected and cached {new_count} new event(s)") else: log.info("No new events found") write_cache(CACHE_FILE, cached_urls)