iptv/M3U8/scrape/utils/config.py
2025-09-05 10:37:22 -04:00

145 lines
3.6 KiB
Python

import asyncio
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any
import httpx
import pytz
from playwright.async_api import Request
TZ = pytz.timezone("America/New_York")
now = datetime.now(TZ)
LOGOS = {
"MLB": "https://i.gyazo.com/0fe7865ef2f06c9507791b24f04dbca8.png",
"NBA": "https://i.gyazo.com/773c23570f095a5d549c23b9401d83f4.png",
"NCAAF": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png",
"NCAAB": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png",
"NFL": "https://i.gyazo.com/fb4956d7a2fe54a1bac54cd81e1b3f11.png",
"NHL": "https://i.gyazo.com/526607d4e886d5ed1fecca4bff3115e2.png",
"WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png",
}
LOG_FMT = (
"[%(asctime)s] "
"%(levelname)-8s "
"[%(name)s] "
"%(message)-70s "
"(%(filename)s:%(lineno)d)"
)
COLORS = {
"DEBUG": "\033[37m",
"INFO": "\033[32m",
"WARNING": "\033[33m",
"ERROR": "\033[31m",
"CRITICAL": "\033[41m",
"reset": "\033[0m",
}
class ColorFormatter(logging.Formatter):
def format(self, record) -> str:
color = COLORS.get(record.levelname, "")
levelname = record.levelname
record.levelname = f"{color}{levelname}{COLORS['reset']}"
formatted = super().format(record)
record.levelname = levelname
return formatted
def get_logger(name: str | None = None) -> logging.Logger:
if not name:
name = Path(__file__).stem
logger = logging.getLogger(name)
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = ColorFormatter(LOG_FMT, datefmt="%Y-%m-%d | %H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def load_cache(
file: Path,
exp: int | float = None,
hour: int = None,
) -> dict[str, dict[str, str | float]]:
try:
data = json.loads(file.read_text(encoding="utf-8"))
if exp:
return {
k: v
for k, v in data.items()
if now.timestamp() - v.get("timestamp", 0) < exp
}
elif hour:
return {} if now.hour <= hour else data
except (FileNotFoundError, json.JSONDecodeError):
return {}
async def safe_process_event(
fn,
url_num: int,
timeout=20,
log: logging.Logger | None = None,
) -> Any | None:
if not log:
log = logging.getLogger(__name__)
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}")
async def check_status(client: httpx.AsyncClient, url: str) -> bool:
try:
r = await client.get(url)
r.raise_for_status()
except Exception:
return False
return r.status_code == 200
async def get_base(client: httpx.AsyncClient, mirrors: list[str]) -> str:
tasks = [check_status(client, link) for link in mirrors]
results = await asyncio.gather(*tasks)
return [url for url, ok in zip(mirrors, results) if ok][0]
def capture_req(
req: Request,
captured: list[str],
got_one: asyncio.Event,
) -> None:
valid_m3u8 = re.compile(r"^(?!.*(amazonaws|knitcdn)).*\.m3u8")
if valid_m3u8.search(req.url):
captured.append(req.url)
got_one.set()