iptv/M3U8/scrapers/livetvsx.py
2025-09-16 13:22:30 -04:00

329 lines
9.1 KiB
Python

import asyncio
import io
import ssl
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
import httpx
from playwright.async_api import async_playwright
from .utils import (
LOGOS,
TZ,
capture_req,
firefox,
get_logger,
load_cache,
now,
safe_process_event,
write_cache,
)
log = get_logger(__name__)
urls: dict[str, dict[str, str | float]] = {}
BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml"
CERT_BUNDL_URLS = [
"https://curl.se/ca/cacert.pem",
"https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem",
"https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem",
"https://ssl.com/repo/certs/Sectigo-AAA-Root.pem",
]
CERT_FILE = Path(__file__).parent / "utils" / "cached-ca.pem"
CACHE_FILE = Path(__file__).parent / "caches" / "livetvsx.json"
exist_sprts = set(LOGOS.keys())
async def write_to_cert(
client: httpx.AsyncClient,
url: str,
cert: Path,
) -> None:
try:
r = await client.get(url)
r.raise_for_status()
except Exception:
log.error(f"Failed to write fetch: {url} returned {r.status_code}")
with cert.open("a", encoding="utf-8") as f:
f.write(f"{r.text}\n")
async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext:
CERT_FILE.unlink(missing_ok=True)
tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDL_URLS]
await asyncio.gather(*tasks)
async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext:
if CERT_FILE.is_file():
mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime, TZ)
if now - mtime < timedelta(days=30):
return ssl.create_default_context(cafile=CERT_FILE)
log.info("Refreshing cached certificate")
await refresh_cert_cache(client)
return ssl.create_default_context(cafile=CERT_FILE)
async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO | None:
buffer = io.BytesIO()
try:
async with httpx.AsyncClient(
timeout=10,
verify=ssl_ctx,
follow_redirects=True,
) as client:
async with client.stream("GET", url) as r:
r.raise_for_status()
async for chunk in r.aiter_bytes(8192):
buffer.write(chunk)
buffer.seek(0)
return buffer
except Exception as e:
log.error(f"Failed to fetch {url}: {e}")
return
async def process_event(url: str, url_num: int) -> str | None:
async with async_playwright() as p:
browser, context = await firefox(p, ignore_https_errors=True)
page = await context.new_page()
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(capture_req, captured=captured, got_one=got_one)
popup = None
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=10_000,
)
btn = await page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)")
if btn:
try:
await btn.click()
await page.wait_for_timeout(500)
except Exception as e:
log.debug(f"URL {url_num}) Failed to click Browser Links tab: {e}")
return
else:
log.warning(f"URL {url_num}) Browser Links tab not found")
link_img = await page.query_selector(
"tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img"
)
if not link_img:
log.warning(f"URL {url_num}) No browser link to click.")
return
page.on("request", handler)
try:
async with page.expect_popup(timeout=5_000) as popup_info:
try:
await link_img.click()
except Exception as e:
log.debug(
f"URL {url_num}) Click failed (popup might have already been opened): {e}"
)
popup = await popup_info.value
popup.on("request", handler)
except Exception:
try:
await link_img.click()
except Exception as e:
log.debug(f"URL {url_num}) Fallback click failed: {e}")
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=1.5e1)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
page.remove_listener("request", handler)
if popup:
popup.remove_listener("request", handler)
await popup.close()
await page.close()
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[-1]
log.warning(f"URL {url_num}) No M3U8 captured in popup or inline playback.")
return
except Exception:
try:
page.remove_listener("request", handler)
if popup:
popup.remove_listener("request", handler)
await popup.close()
await page.close()
except Exception:
pass
await browser.close()
async def get_events(
url: str,
ssl_ctx: ssl.SSLContext,
cached_keys: set[str],
) -> list[dict[str, str]]:
events: list[dict[str, str]] = []
start_dt = now - timedelta(minutes=30)
end_dt = now + timedelta(minutes=30)
if buffer := await fetch_xml_stream(url, ssl_ctx):
pub_date_format = "%a, %d %b %Y %H:%M:%S %z"
for _, elem in ET.iterparse(buffer, events=("end",)):
if elem.tag == "item":
title = elem.findtext("title")
desc = elem.findtext("description")
pub_date = elem.findtext("pubDate")
link = elem.findtext("link")
try:
dt = datetime.strptime(pub_date, pub_date_format)
dt = dt.astimezone(TZ)
except Exception:
elem.clear()
continue
if not start_dt <= dt <= end_dt:
elem.clear()
continue
sport, event = (
(
desc.split(".")[0].strip(),
" ".join(p.strip() for p in desc.split(".")[1:]),
)
if desc
else ("", "")
)
key = f"[{sport}: {event}] {title} (LTVSX)"
if cached_keys & {key}:
elem.clear()
continue
if exist_sprts & {sport, event}:
elem.clear()
continue
events.append(
{
"sport": sport,
"event": event,
"title": title,
"link": link,
}
)
elem.clear()
return events
async def main(client: httpx.AsyncClient) -> None:
cached_urls = load_cache(CACHE_FILE, exp=10800)
cached_count = len(cached_urls)
urls.update(cached_urls)
log.info(f"Collected {cached_count} event(s) from cache")
log.info(f'Scraping from "{BASE_URL}"')
cert = await get_cert(client)
events = await get_events(
BASE_URL,
cert,
set(cached_urls.keys()),
)
log.info(f"Processing {len(events)} new URL(s)")
for i, ev in enumerate(events, start=1):
sport = ev["sport"]
event = ev["event"]
title = ev["title"]
link = ev["link"]
key = f"[{sport}: {event}] {title} (LTVSX)"
url = await safe_process_event(
lambda: process_event(link, url_num=i),
url_num=i,
log=log,
)
if url:
entry = {
"url": url,
"logo": LOGOS.get(sport, LOGOS["default"]),
"base": "https://livetv.sx/enx/",
"timestamp": now.timestamp(),
}
urls[key] = cached_urls[key] = entry
if new_count := len(cached_urls) - cached_count:
log.info(f"Collected and cached {new_count} new event(s)")
else:
log.info("No new events found")
write_cache(CACHE_FILE, cached_urls)