iptv/M3U8/scrapers/ppv.py
2025-09-20 23:26:18 -04:00

267 lines
6.8 KiB
Python

import asyncio
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from urllib.parse import urljoin
import httpx
from playwright.async_api import async_playwright
from .utils import (
TZ,
capture_req,
get_base,
get_logger,
league_info,
load_cache,
new_browser,
now,
safe_process_event,
write_cache,
)
log = get_logger(__name__)
urls: dict[str, dict[str, str | float]] = {}
API_FILE = Path(__file__).parent / "caches" / "ppv_api.json"
CACHE_FILE = Path(__file__).parent / "caches" / "ppv.json"
MIRRORS = [
"https://ppvs.su",
"https://ppv.to",
"https://ppv.wtf",
"https://ppv.land",
"https://freeppv.fun",
]
NFL_TEAMS = {
"Arizona Cardinals",
"Atlanta Falcons",
"Baltimore Ravens",
"Buffalo Bills",
"Carolina Panthers",
"Chicago Bears",
"Cincinnati Bengals",
"Cleveland Browns",
"Dallas Cowboys",
"Denver Broncos",
"Detroit Lions",
"Green Bay Packers",
"Houston Texans",
"Indianapolis Colts",
"Jacksonville Jaguars",
"Kansas City Chiefs",
"Las Vegas Raiders",
"Los Angeles Chargers",
"Los Angeles Rams",
"Miami Dolphins",
"Minnesota Vikings",
"New England Patriots",
"New Orleans Saints",
"New York Giants",
"New York Jets",
"Philadelphia Eagles",
"Pittsburgh Steelers",
"San Francisco 49ers",
"Seattle Seahawks",
"Tampa Bay Buccaneers",
"Tennessee Titans",
"Washington Redskins",
}
def is_nfl(event: str) -> bool:
try:
t1, t2 = event.split(" vs. ")
return t1 in NFL_TEAMS or t2 in NFL_TEAMS
except ValueError:
return False
async def refresh_api_cache(
client: httpx.AsyncClient, url: str
) -> dict[str, dict[str, str]]:
log.info("Refreshing API cache")
try:
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'Failed to fetch "{url}"\n{e}')
return {}
return r.json()
async def process_event(url: str, url_num: int) -> str | None:
async with async_playwright() as p:
browser, context = await new_browser(p)
page = await context.new_page()
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(capture_req, captured=captured, got_one=got_one)
page.on("request", handler)
try:
await page.goto(url, wait_until="domcontentloaded", timeout=15_000)
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=10)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[-1]
log.warning(f"URL {url_num}) No M3U8 captured after waiting.")
return
except Exception as e:
log.warning(f"URL {url_num}) Exception while processing: {e}")
return
finally:
page.remove_listener("request", handler)
await page.close()
await browser.close()
async def get_events(
client: httpx.AsyncClient,
base_url: str,
cached_keys: set[str],
) -> list[dict[str, str]]:
events: list[dict[str, str]] = []
if not (
api_data := load_cache(
API_FILE,
exp=86400,
nearest_hr=True,
per_entry=False,
)
):
api_data = await refresh_api_cache(client, urljoin(base_url, "api/streams"))
write_cache(API_FILE, api_data)
for stream_group in api_data["streams"]:
sport = stream_group["category"]
if sport == "24/7 Streams":
continue
for event in stream_group["streams"]:
name, start_ts, end_ts, logo, uri_name = (
event["name"],
event["starts_at"],
event["ends_at"],
event["poster"],
event["uri_name"],
)
key = f"[{sport}] {name} (PPV)"
if cached_keys & {key}:
continue
start_dt = datetime.fromtimestamp(start_ts, tz=TZ) - timedelta(minutes=30)
end_dt = datetime.fromtimestamp(end_ts, tz=TZ) + timedelta(minutes=30)
if not start_dt <= now < end_dt:
continue
events.append(
{
"sport": sport,
"event": name,
"link": urljoin(base_url, f"live/{uri_name}"),
"logo": logo,
}
)
return events
async def scrape(client: httpx.AsyncClient) -> None:
cached_urls = load_cache(CACHE_FILE, exp=10_800)
cached_count = len(cached_urls)
urls.update(cached_urls)
log.info(f"Collected {cached_count} event(s) from cache")
if not (base_url := await get_base(client, MIRRORS)):
log.warning("No working PPV mirrors")
write_cache(CACHE_FILE, cached_urls)
return
log.info(f'Scraping from "{base_url}"')
events = await get_events(
client,
base_url,
set(cached_urls.keys()),
)
log.info(f"Processing {len(events)} new URL(s)")
for i, ev in enumerate(events, start=1):
url = await safe_process_event(
lambda: process_event(ev["link"], url_num=i),
url_num=i,
log=log,
)
if url:
sport, event = ev["sport"], ev["event"]
if sport == "American Football":
tvg_id = "NFL.Dummy.us" if is_nfl(event) else "NCAA.Sports.Dummy.us"
else:
tvg_id = league_info(sport)[0]
key = f"[{sport}] {event} (PPV)"
entry = {
"url": url,
"logo": ev["logo"],
"base": base_url,
"timestamp": now.timestamp(),
"id": tvg_id or "Live.Event.us",
}
urls[key] = cached_urls[key] = entry
if new_count := len(cached_urls) - cached_count:
log.info(f"Collected and cached {new_count} new event(s)")
else:
log.info("No new events found")
write_cache(CACHE_FILE, cached_urls)
# works if no cloudflare bot detection