import asyncio import json import logging import re from collections.abc import Callable from datetime import datetime from pathlib import Path from typing import Any import httpx import pytz from playwright.async_api import Browser, BrowserContext, Playwright, Request TZ = pytz.timezone("America/New_York") now = datetime.now(TZ) UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0" ) LOGOS = { "default": "https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png", "MLB": "https://i.gyazo.com/0fe7865ef2f06c9507791b24f04dbca8.png", "NBA": "https://i.gyazo.com/773c23570f095a5d549c23b9401d83f4.png", "NCAA": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png", "NFL": "https://i.gyazo.com/fb4956d7a2fe54a1bac54cd81e1b3f11.png", "NHL": "https://i.gyazo.com/526607d4e886d5ed1fecca4bff3115e2.png", "WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png", "La Liga": "https://i.gyazo.com/3ea07074f7faab98c00493f07f4c6661.png", "Premier League": "https://i.gyazo.com/5cf939a9669647ec49c5ca61ab34789d.png", "Serie A": "https://i.gyazo.com/38fd8ea613b0f02780d2314fd49f7595.png", "Bundesliga": "https://i.gyazo.com/d608cd1fe95c288aba9e03a9b2f2b688.png", "Ligue 1": "https://i.gyazo.com/e5cd3f3960ea0fc7a10f831b6c79d31d.png", "Primeira Liga": "https://i.gyazo.com/0b9ff26408609ccb90bf45d60aa13500.png", "MLS": "https://i.gyazo.com/014b639a369d2bd8a4b97d00a239f330.png", } alias_map = { "NCAA": ["NCAAF", "NCAAB", "CBB", "CFB"], "Premier League": ["EPL"], } for base, aliases in alias_map.items(): for alias in aliases: LOGOS[alias] = LOGOS[base] LOG_FMT = ( "[%(asctime)s] " "%(levelname)-8s " "[%(name)s] " "%(message)-70s " "(%(filename)s:%(lineno)d)" ) COLORS = { "DEBUG": "\033[37m", "INFO": "\033[32m", "WARNING": "\033[33m", "ERROR": "\033[31m", "CRITICAL": "\033[41m", "reset": "\033[0m", } class ColorFormatter(logging.Formatter): def format(self, record) -> str: color = COLORS.get(record.levelname, "") levelname = record.levelname record.levelname = f"{color}{levelname}{COLORS['reset']}" formatted = super().format(record) record.levelname = levelname return formatted def get_logger(name: str | None = None) -> logging.Logger: if not name: name = Path(__file__).stem logger = logging.getLogger(name) if not logger.hasHandlers(): handler = logging.StreamHandler() formatter = ColorFormatter(LOG_FMT, datefmt="%Y-%m-%d | %H:%M:%S") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def near_hr(dt: datetime) -> float: return dt.replace(minute=0, second=0, microsecond=0).timestamp() def is_fresh( entry: dict, nearest_hr: bool, exp: int, ) -> bool: ts = entry.get("timestamp", 31496400) if nearest_hr: ts = near_hr(datetime.fromtimestamp(ts)) return now.timestamp() - ts < exp def load_cache( file: Path, exp: int | float, nearest_hr: bool = False, per_entry: bool = True, ) -> dict[str, dict[str, str | float]]: try: data: dict = json.loads(file.read_text(encoding="utf-8")) except (FileNotFoundError, json.JSONDecodeError): return {} if per_entry: return {k: v for k, v in data.items() if is_fresh(v, nearest_hr, exp)} ts = data.get("timestamp", 31496400) if nearest_hr: ts = near_hr(datetime.fromtimestamp(ts)) return data if now.timestamp() - ts < exp else {} def write_cache(file: Path, data: dict) -> None: file.write_text(json.dumps(data, indent=2), encoding="utf-8") async def safe_process_event( fn: Callable, url_num: int, timeout: int | float = 20, log: logging.Logger | None = None, ) -> Any | None: if not log: log = logging.getLogger(__name__) task = asyncio.create_task(fn()) try: return await asyncio.wait_for(task, timeout=timeout) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event") task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: log.debug(f"URL {url_num}) Ignore exception after timeout: {e}") async def check_status(client: httpx.AsyncClient, url: str) -> bool: try: r = await client.get(url) r.raise_for_status() except Exception: return False return r.status_code == 200 async def get_base(client: httpx.AsyncClient, mirrors: list[str]) -> str | None: tasks = [check_status(client, link) for link in mirrors] results = await asyncio.gather(*tasks) try: return [url for url, ok in zip(mirrors, results) if ok][0] except IndexError: return def capture_req( req: Request, captured: list[str], got_one: asyncio.Event, ) -> None: valid_m3u8 = re.compile(r"^(?!.*(amazonaws|knitcdn)).*\.m3u8") if valid_m3u8.search(req.url): captured.append(req.url) got_one.set() async def firefox( playwright: Playwright, ignore_https_errors: bool = False ) -> tuple[Browser, BrowserContext]: browser = await playwright.firefox.launch(headless=True) context = await browser.new_context( user_agent=UA, viewport={"width": 1366, "height": 768}, device_scale_factor=1, locale="en-US", timezone_id="America/New_York", color_scheme="dark", permissions=["geolocation"], extra_http_headers={ "Accept-Language": "en-US,en;q=0.9", "Upgrade-Insecure-Requests": "1", }, ignore_https_errors=ignore_https_errors, ) await context.add_init_script( """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4] }); const elementDescriptor = Object.getOwnPropertyDescriptor(HTMLElement.prototype, 'offsetHeight'); Object.defineProperty(HTMLDivElement.prototype, 'offsetHeight', { ...elementDescriptor, get: function() { if (this.id === 'modernizr') { return 24; } return elementDescriptor.get.apply(this); } }); Object.defineProperty(window.screen, 'width', { get: () => 1366 }); Object.defineProperty(window.screen, 'height', { get: () => 768 }); const getParameter = WebGLRenderingContext.prototype. getParameter; WebGLRenderingContext.prototype.getParameter = function (param) { if (param === 37445) return "Intel Inc."; // UNMASKED_VENDOR_WEBGL if (param === 37446) return "Intel Iris OpenGL Engine"; // UNMASKED_RENDERER_WEBGL return getParameter.apply(this, [param]); }; """ ) return browser, context