import asyncio import json import logging import re from datetime import datetime from pathlib import Path from typing import Any import httpx import pytz from playwright.async_api import Request TZ = pytz.timezone("America/New_York") now = datetime.now(TZ) LOGOS = { "MLB": "https://i.gyazo.com/0fe7865ef2f06c9507791b24f04dbca8.png", "NBA": "https://i.gyazo.com/773c23570f095a5d549c23b9401d83f4.png", "NCAAF": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png", "NCAAB": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png", "NFL": "https://i.gyazo.com/fb4956d7a2fe54a1bac54cd81e1b3f11.png", "NHL": "https://i.gyazo.com/526607d4e886d5ed1fecca4bff3115e2.png", "WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png", } LOG_FMT = ( "[%(asctime)s] " "%(levelname)-8s " "[%(name)s] " "%(message)-70s " "(%(filename)s:%(lineno)d)" ) COLORS = { "DEBUG": "\033[37m", "INFO": "\033[32m", "WARNING": "\033[33m", "ERROR": "\033[31m", "CRITICAL": "\033[41m", "reset": "\033[0m", } class ColorFormatter(logging.Formatter): def format(self, record) -> str: color = COLORS.get(record.levelname, "") levelname = record.levelname record.levelname = f"{color}{levelname}{COLORS['reset']}" formatted = super().format(record) record.levelname = levelname return formatted def get_logger(name: str | None = None) -> logging.Logger: if not name: name = Path(__file__).stem logger = logging.getLogger(name) if not logger.hasHandlers(): handler = logging.StreamHandler() formatter = ColorFormatter(LOG_FMT, datefmt="%Y-%m-%d | %H:%M:%S") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def load_cache( file: Path, exp: int | float = None, ) -> dict[str, dict[str, str | float]]: try: data: dict[str, dict[str, str | float]] = json.loads( file.read_text(encoding="utf-8") ) return { k: v for k, v in data.items() if now.timestamp() - v.get("timestamp", 0) < exp } except (FileNotFoundError, json.JSONDecodeError): return {} async def safe_process_event( fn, url_num: int, timeout=20, log: logging.Logger | None = None, ) -> Any | None: if not log: log = logging.getLogger(__name__) task = asyncio.create_task(fn()) try: return await asyncio.wait_for(task, timeout=timeout) except asyncio.TimeoutError: log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event") task.cancel() try: await task except asyncio.CancelledError: pass except Exception as e: log.debug(f"URL {url_num}) Ignore exception after timeout: {e}") async def check_status(client: httpx.AsyncClient, url: str) -> bool: try: r = await client.get(url) r.raise_for_status() except Exception: return False return r.status_code == 200 async def get_base(client: httpx.AsyncClient, mirrors: list[str]) -> str: tasks = [check_status(client, link) for link in mirrors] results = await asyncio.gather(*tasks) return [url for url, ok in zip(mirrors, results) if ok][0] def capture_req( req: Request, captured: list[str], got_one: asyncio.Event, ) -> None: valid_m3u8 = re.compile(r"^(?!.*(amazonaws|knitcdn)).*\.m3u8") if valid_m3u8.search(req.url): captured.append(req.url) got_one.set()