iptv/M3U8/scrapers/utils/webwork.py
doms9 00000d9079 e
misc. edits
2025-12-18 03:04:11 -05:00

285 lines
7.8 KiB
Python

import asyncio
import logging
import random
import re
from collections.abc import Awaitable, Callable
from functools import partial
from typing import TypeVar
from urllib.parse import urlencode, urljoin
import httpx
from playwright.async_api import Browser, BrowserContext, Playwright, Request
from .logger import get_logger
T = TypeVar("T")
class Network:
proxy_base = "https://stream.nvrmind.xyz"
UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
)
def __init__(self) -> None:
self.client = httpx.AsyncClient(
timeout=5,
follow_redirects=True,
headers={"User-Agent": Network.UA},
http2=True,
)
self._logger = get_logger("network")
@staticmethod
def build_proxy_url(
tag: str,
path: str,
query: dict | None = None,
) -> str:
base = network.proxy_base
tag = tag.lower()
return (
f"{urljoin(base, f'{tag}/{path}')}?{urlencode(query)}"
if query
else urljoin(base, f"{tag}/{path}")
)
async def request(
self,
url: str,
log: logging.Logger | None = None,
**kwargs,
) -> httpx.Response | None:
log = log or self._logger
try:
r = await self.client.get(url, **kwargs)
r.raise_for_status()
except Exception as e:
log.error(f'Failed to fetch "{url}": {e}\n{kwargs = }')
return ""
return r
async def get_base(self, mirrors: list[str]) -> str | None:
random.shuffle(mirrors)
for mirror in mirrors:
if not (r := await self.request(mirror)):
continue
elif r.status_code != 200:
continue
return mirror
@staticmethod
async def safe_process(
fn: Callable[[], Awaitable[T]],
url_num: int,
timeout: int | float = 15,
log: logging.Logger | None = None,
) -> T | None:
log = log or get_logger("network")
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}")
return None
except Exception as e:
log.error(f"URL {url_num}) Unexpected error: {e}")
return None
@staticmethod
def capture_req(
req: Request,
captured: list[str],
got_one: asyncio.Event,
) -> None:
invalids = ["amazonaws", "knitcdn"]
escaped = [re.escape(i) for i in invalids]
pattern = re.compile(
rf"^(?!.*({'|'.join(escaped)})).*\.m3u8",
re.IGNORECASE,
)
if pattern.search(req.url):
captured.append(req.url)
got_one.set()
async def process_event(
self,
url: str,
url_num: int,
context: BrowserContext,
timeout: int | float = 10,
log: logging.Logger | None = None,
) -> str | None:
log = log or self._logger
page = await context.new_page()
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(
self.capture_req,
captured=captured,
got_one=got_one,
)
page.on("request", handler)
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=15_000,
)
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[0]
log.warning(f"URL {url_num}) No M3U8 captured after waiting.")
return
except Exception as e:
log.warning(f"URL {url_num}) Exception while processing: {e}")
return
finally:
page.remove_listener("request", handler)
await page.close()
@staticmethod
async def browser(
playwright: Playwright,
browser: str = "firefox",
ignore_https_errors: bool = False,
) -> tuple[Browser, BrowserContext]:
if browser == "brave":
brwsr = await playwright.chromium.connect_over_cdp("http://localhost:9222")
context = brwsr.contexts[0]
else:
brwsr = await playwright.firefox.launch(headless=True)
context = await brwsr.new_context(
user_agent=Network.UA,
ignore_https_errors=ignore_https_errors,
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
locale="en-US",
timezone_id="America/New_York",
color_scheme="dark",
permissions=["geolocation"],
extra_http_headers={
"Accept-Language": "en-US,en;q=0.9",
"Upgrade-Insecure-Requests": "1",
},
)
await context.add_init_script(
"""
Object.defineProperty(navigator, "webdriver", { get: () => undefined });
Object.defineProperty(navigator, "languages", {
get: () => ["en-US", "en"],
});
Object.defineProperty(navigator, "plugins", {
get: () => [1, 2, 3, 4],
});
const elementDescriptor = Object.getOwnPropertyDescriptor(
HTMLElement.prototype,
"offsetHeight"
);
Object.defineProperty(HTMLDivElement.prototype, "offsetHeight", {
...elementDescriptor,
get: function () {
if (this.id === "modernizr") {
return 24;
}
return elementDescriptor.get.apply(this);
},
});
Object.defineProperty(window.screen, "width", { get: () => 1366 });
Object.defineProperty(window.screen, "height", { get: () => 768 });
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function (param) {
if (param === 37445) return "Intel Inc."; // UNMASKED_VENDOR_WEBGL
if (param === 37446) return "Intel Iris OpenGL Engine"; // UNMASKED_RENDERER_WEBGL
return getParameter.apply(this, [param]);
};
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
mutation.addedNodes.forEach((node) => {
if (node.tagName === "IFRAME" && node.hasAttribute("sandbox")) {
node.removeAttribute("sandbox");
}
});
});
});
observer.observe(document.documentElement, { childList: true, subtree: true });
"""
)
return brwsr, context
network = Network()
__all__ = ["network"]