iptv/M3U8/scrapers/utils/config.py
2025-09-15 09:26:20 -04:00

249 lines
7.1 KiB
Python

import asyncio
import json
import logging
import re
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any
import httpx
import pytz
from playwright.async_api import Browser, BrowserContext, Playwright, Request
TZ = pytz.timezone("America/New_York")
now = datetime.now(TZ)
UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
)
LOGOS = {
"default": "https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png",
"MLB": "https://i.gyazo.com/0fe7865ef2f06c9507791b24f04dbca8.png",
"NBA": "https://i.gyazo.com/773c23570f095a5d549c23b9401d83f4.png",
"NCAA": "https://i.gyazo.com/ca63b40c86e757436de9d34d369b24f8.png",
"NFL": "https://i.gyazo.com/fb4956d7a2fe54a1bac54cd81e1b3f11.png",
"NHL": "https://i.gyazo.com/526607d4e886d5ed1fecca4bff3115e2.png",
"WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png",
"La Liga": "https://i.gyazo.com/3ea07074f7faab98c00493f07f4c6661.png",
"Premier League": "https://i.gyazo.com/5cf939a9669647ec49c5ca61ab34789d.png",
"Serie A": "https://i.gyazo.com/38fd8ea613b0f02780d2314fd49f7595.png",
"Bundesliga": "https://i.gyazo.com/d608cd1fe95c288aba9e03a9b2f2b688.png",
"Ligue 1": "https://i.gyazo.com/e5cd3f3960ea0fc7a10f831b6c79d31d.png",
"Primeira Liga": "https://i.gyazo.com/0b9ff26408609ccb90bf45d60aa13500.png",
"MLS": "https://i.gyazo.com/014b639a369d2bd8a4b97d00a239f330.png",
}
alias_map = {
"NCAA": ["NCAAF", "NCAAB", "CBB", "CFB"],
"Premier League": ["EPL"],
}
for base, aliases in alias_map.items():
for alias in aliases:
LOGOS[alias] = LOGOS[base]
LOG_FMT = (
"[%(asctime)s] "
"%(levelname)-8s "
"[%(name)s] "
"%(message)-70s "
"(%(filename)s:%(lineno)d)"
)
COLORS = {
"DEBUG": "\033[37m",
"INFO": "\033[32m",
"WARNING": "\033[33m",
"ERROR": "\033[31m",
"CRITICAL": "\033[41m",
"reset": "\033[0m",
}
class ColorFormatter(logging.Formatter):
def format(self, record) -> str:
color = COLORS.get(record.levelname, "")
levelname = record.levelname
record.levelname = f"{color}{levelname}{COLORS['reset']}"
formatted = super().format(record)
record.levelname = levelname
return formatted
def get_logger(name: str | None = None) -> logging.Logger:
if not name:
name = Path(__file__).stem
logger = logging.getLogger(name)
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = ColorFormatter(LOG_FMT, datefmt="%Y-%m-%d | %H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def near_hr(dt: datetime) -> float:
return dt.replace(minute=0, second=0, microsecond=0).timestamp()
def is_fresh(
entry: dict,
nearest_hr: bool,
exp: int,
) -> bool:
ts = entry.get("timestamp", 31496400)
if nearest_hr:
ts = near_hr(datetime.fromtimestamp(ts))
return now.timestamp() - ts < exp
def load_cache(
file: Path,
exp: int | float,
nearest_hr: bool = False,
per_entry: bool = True,
) -> dict[str, dict[str, str | float]]:
try:
data: dict = json.loads(file.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return {}
if per_entry:
return {k: v for k, v in data.items() if is_fresh(v, nearest_hr, exp)}
ts = data.get("timestamp", 31496400)
if nearest_hr:
ts = near_hr(datetime.fromtimestamp(ts))
return data if now.timestamp() - ts < exp else {}
def write_cache(file: Path, data: dict) -> None:
file.write_text(json.dumps(data, indent=2), encoding="utf-8")
async def safe_process_event(
fn: Callable,
url_num: int,
timeout: int | float = 20,
log: logging.Logger | None = None,
) -> Any | None:
if not log:
log = logging.getLogger(__name__)
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}")
async def check_status(client: httpx.AsyncClient, url: str) -> bool:
try:
r = await client.get(url)
r.raise_for_status()
except Exception:
return False
return r.status_code == 200
async def get_base(client: httpx.AsyncClient, mirrors: list[str]) -> str | None:
tasks = [check_status(client, link) for link in mirrors]
results = await asyncio.gather(*tasks)
try:
return [url for url, ok in zip(mirrors, results) if ok][0]
except IndexError:
return
def capture_req(
req: Request,
captured: list[str],
got_one: asyncio.Event,
) -> None:
valid_m3u8 = re.compile(r"^(?!.*(amazonaws|knitcdn)).*\.m3u8")
if valid_m3u8.search(req.url):
captured.append(req.url)
got_one.set()
async def firefox(
playwright: Playwright, ignore_https_errors: bool = False
) -> tuple[Browser, BrowserContext]:
browser = await playwright.firefox.launch(headless=True)
context = await browser.new_context(
user_agent=UA,
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
locale="en-US",
timezone_id="America/New_York",
color_scheme="dark",
permissions=["geolocation"],
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Upgrade-Insecure-Requests": "1",
},
ignore_https_errors=ignore_https_errors,
)
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4]
});
const elementDescriptor = Object.getOwnPropertyDescriptor(HTMLElement.prototype, 'offsetHeight');
Object.defineProperty(HTMLDivElement.prototype, 'offsetHeight', {
...elementDescriptor,
get: function() {
if (this.id === 'modernizr') { return 24; }
return elementDescriptor.get.apply(this);
}
});
Object.defineProperty(window.screen, 'width', { get: () => 1366 });
Object.defineProperty(window.screen, 'height', { get: () => 768 });
const getParameter = WebGLRenderingContext.prototype. getParameter;
WebGLRenderingContext.prototype.getParameter = function (param) {
if (param === 37445) return "Intel Inc."; // UNMASKED_VENDOR_WEBGL
if (param === 37446) return "Intel Iris OpenGL Engine"; // UNMASKED_RENDERER_WEBGL
return getParameter.apply(this, [param]);
};
"""
)
return browser, context