iptv/M3U8/scrapers/utils/webwork.py
doms9 00000d9233 e
fix hanging if no available mirrors
2025-12-12 10:46:56 -05:00

275 lines
7.8 KiB
Python

import asyncio
import logging
import random
import re
from collections.abc import Awaitable, Callable
from functools import partial
from typing import TypeVar
from urllib.parse import urlencode, urljoin
import httpx
from playwright.async_api import Browser, BrowserContext, Playwright, Request
from .logger import get_logger
T = TypeVar("T")
class Network:
proxy_base = "https://stream.nvrmind.xyz"
UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
)
def __init__(self) -> None:
self.client = httpx.AsyncClient(
timeout=5,
follow_redirects=True,
headers={"User-Agent": Network.UA},
http2=True,
)
self._logger = get_logger("network")
@staticmethod
def build_proxy_url(
tag: str,
path: str,
query: dict | None = None,
) -> str:
base = network.proxy_base
tag = tag.lower()
return (
f"{urljoin(base, f'{tag}/{path}')}?{urlencode(query)}"
if query
else urljoin(base, f"{tag}/{path}")
)
async def check_status(self, url: str) -> bool:
try:
r = await self.client.get(url, timeout=5)
r.raise_for_status()
return r.status_code == 200
except (httpx.HTTPError, httpx.TimeoutException) as e:
self._logger.debug(f"Status check failed for {url}: {e}")
return False
async def get_base(self, mirrors: list[str]) -> str | None:
random.shuffle(mirrors)
tasks = [self.check_status(link) for link in mirrors]
results = await asyncio.gather(*tasks, return_exceptions=True)
working_mirrors = [
mirror for mirror, success in zip(mirrors, results) if success
]
return working_mirrors[0] if working_mirrors else None
@staticmethod
async def safe_process(
fn: Callable[[], Awaitable[T]],
url_num: int,
timeout: int | float = 15,
log: logging.Logger | None = None,
) -> T | None:
if not log:
log = logging.getLogger(__name__)
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}")
return None
except Exception as e:
log.error(f"URL {url_num}) Unexpected error: {e}")
return None
@staticmethod
def capture_req(
req: Request,
captured: list[str],
got_one: asyncio.Event,
) -> None:
invalids = ["amazonaws", "knitcdn"]
escaped = [re.escape(i) for i in invalids]
pattern = re.compile(
rf"^(?!.*({'|'.join(escaped)})).*\.m3u8",
re.IGNORECASE,
)
if pattern.search(req.url):
captured.append(req.url)
got_one.set()
async def process_event(
self,
url: str,
url_num: int,
context: BrowserContext,
timeout: int | float = 10,
log: logging.Logger | None = None,
) -> str | None:
page = await context.new_page()
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(
self.capture_req,
captured=captured,
got_one=got_one,
)
page.on("request", handler)
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=15_000,
)
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[0]
log.warning(f"URL {url_num}) No M3U8 captured after waiting.")
return
except Exception as e:
log.warning(f"URL {url_num}) Exception while processing: {e}")
return
finally:
page.remove_listener("request", handler)
await page.close()
@staticmethod
async def browser(
playwright: Playwright,
browser: str = "firefox",
ignore_https_errors: bool = False,
) -> tuple[Browser, BrowserContext]:
if browser == "brave":
brwsr = await playwright.chromium.connect_over_cdp("http://localhost:9222")
context = brwsr.contexts[0]
else:
brwsr = await playwright.firefox.launch(headless=True)
context = await brwsr.new_context(
user_agent=Network.UA,
ignore_https_errors=ignore_https_errors,
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
locale="en-US",
timezone_id="America/New_York",
color_scheme="dark",
permissions=["geolocation"],
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Upgrade-Insecure-Requests": "1",
},
)
await context.add_init_script(
"""
Object.defineProperty(navigator, "webdriver", { get: () => undefined });
Object.defineProperty(navigator, "languages", {
get: () => ["en-US", "en"],
});
Object.defineProperty(navigator, "plugins", {
get: () => [1, 2, 3, 4],
});
const elementDescriptor = Object.getOwnPropertyDescriptor(
HTMLElement.prototype,
"offsetHeight"
);
Object.defineProperty(HTMLDivElement.prototype, "offsetHeight", {
...elementDescriptor,
get: function () {
if (this.id === "modernizr") {
return 24;
}
return elementDescriptor.get.apply(this);
},
});
Object.defineProperty(window.screen, "width", { get: () => 1366 });
Object.defineProperty(window.screen, "height", { get: () => 768 });
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function (param) {
if (param === 37445) return "Intel Inc."; // UNMASKED_VENDOR_WEBGL
if (param === 37446) return "Intel Iris OpenGL Engine"; // UNMASKED_RENDERER_WEBGL
return getParameter.apply(this, [param]);
};
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
mutation.addedNodes.forEach((node) => {
if (node.tagName === "IFRAME" && node.hasAttribute("sandbox")) {
node.removeAttribute("sandbox");
}
});
});
});
observer.observe(document.documentElement, { childList: true, subtree: true });
"""
)
return brwsr, context
network = Network()
__all__ = ["network"]