iptv/M3U8/scrapers/utils/webwork.py
doms9 00000d90e4 e
- add adblocking
- edit roxie.py scraping method
- edit tvapp.py scraping method
- modify sports to scrape
- misc edits
2026-02-19 18:16:27 -05:00

295 lines
7.3 KiB
Python

import asyncio
import logging
import random
import re
from collections.abc import Awaitable, Callable
from contextlib import asynccontextmanager
from functools import cache, partial
from pathlib import Path
from typing import AsyncGenerator, TypeVar
from urllib.parse import urlparse
import httpx
from playwright.async_api import (
Browser,
BrowserContext,
Page,
Playwright,
Request,
Route,
)
from .logger import get_logger
logger = get_logger(__name__)
T = TypeVar("T")
class Network:
UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
)
HTTP_S = asyncio.Semaphore(10)
PW_S = asyncio.Semaphore(3)
def __init__(self) -> None:
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(5.0),
follow_redirects=True,
headers={"User-Agent": Network.UA},
http2=True,
)
async def request(
self,
url: str,
log: logging.Logger | None = None,
**kwargs,
) -> httpx.Response | None:
log = log or logger
try:
r = await self.client.get(url, **kwargs)
r.raise_for_status()
return r
except (httpx.HTTPError, httpx.TimeoutException) as e:
log.error(f'Failed to fetch "{url}": {e}')
return ""
async def get_base(self, mirrors: list[str]) -> str | None:
random.shuffle(mirrors)
for mirror in mirrors:
if not (r := await self.request(mirror)):
continue
elif r.status_code != 200:
continue
return mirror
@staticmethod
async def safe_process(
fn: Callable[[], Awaitable[T]],
url_num: int,
semaphore: asyncio.Semaphore,
timeout: int | float = 30,
log: logging.Logger | None = None,
) -> T | None:
log = log or logger
async with semaphore:
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(
f"URL {url_num}) Timed out after {timeout}s, skipping event"
)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.warning(f"URL {url_num}) Ignore exception after timeout: {e}")
return
except Exception as e:
log.error(f"URL {url_num}) Unexpected error: {e}")
return
@staticmethod
@cache
def blocked_domains() -> list[str]:
return (
(Path(__file__).parent / "easylist.txt")
.read_text(encoding="utf-8")
.splitlines()
)
@staticmethod
def to_block(request: Request) -> bool:
hostname = (urlparse(request.url).hostname or "").lower()
return any(
hostname == domain or hostname.endswith(f".{domain}")
for domain in Network.blocked_domains()
)
@staticmethod
async def _adblock(route: Route) -> None:
request = route.request
if request.resource_type not in ["script", "image", "media", "xhr"]:
await route.continue_()
return
await route.abort() if Network.to_block(request) else await route.continue_()
@staticmethod
@asynccontextmanager
async def event_context(
browser: Browser,
stealth: bool = True,
ignore_https: bool = False,
) -> AsyncGenerator[BrowserContext, None]:
context: BrowserContext | None = None
try:
if stealth:
context = await browser.new_context(
user_agent=Network.UA,
ignore_https_errors=ignore_https,
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
locale="en-US",
timezone_id="America/New_York",
color_scheme="dark",
extra_http_headers=(
{
"Accept-Language": "en-US,en;q=0.9",
"Upgrade-Insecure-Requests": "1",
}
),
)
await context.add_init_script(path=Path(__file__).parent / "stealth.js")
await context.route("**/*", Network._adblock)
else:
context = await browser.new_context()
yield context
finally:
if context:
await context.close()
@staticmethod
@asynccontextmanager
async def event_page(context: BrowserContext) -> AsyncGenerator[Page, None]:
page = await context.new_page()
try:
yield page
finally:
await page.close()
@staticmethod
async def browser(playwright: Playwright, external: bool = False) -> Browser:
return (
await playwright.chromium.connect_over_cdp("http://localhost:9222")
if external
else await playwright.firefox.launch(headless=True)
)
@staticmethod
def capture_req(
req: Request,
captured: list[str],
got_one: asyncio.Event,
) -> None:
invalids = ["amazonaws", "knitcdn", "jwpltx"]
escaped = [re.escape(i) for i in invalids]
pattern = re.compile(
rf"^(?!.*({'|'.join(escaped)})).*\.m3u8",
re.IGNORECASE,
)
if pattern.search(req.url):
captured.append(req.url)
got_one.set()
async def process_event(
self,
url: str,
url_num: int,
page: Page,
timeout: int | float = 10,
log: logging.Logger | None = None,
) -> str | None:
log = log or logger
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(
self.capture_req,
captured=captured,
got_one=got_one,
)
page.on("request", handler)
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=6_000,
)
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[0]
log.warning(f"URL {url_num}) No M3U8 captured after waiting.")
return
except Exception as e:
log.warning(f"URL {url_num}) {e}")
return
finally:
page.remove_listener("request", handler)
network = Network()
__all__ = ["network"]