iptv/M3U8/scrapers/livetvsx.py
2025-09-24 12:30:55 -04:00

331 lines
9.1 KiB
Python

import asyncio
import io
import ssl
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
import httpx
from playwright.async_api import async_playwright
from .utils import (
TZ,
capture_req,
get_logger,
leagues,
load_cache,
new_browser,
now,
safe_process_event,
write_cache,
)
log = get_logger(__name__)
urls: dict[str, dict[str, str | float]] = {}
BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml"
CERT_BUNDLE_URLS = [
"https://curl.se/ca/cacert.pem",
"https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem",
"https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem",
"https://ssl.com/repo/certs/Sectigo-AAA-Root.pem",
]
CERT_FILE = Path(__file__).parent / "caches" / "cached-cert.pem"
CACHE_FILE = Path(__file__).parent / "caches" / "livetvsx.json"
async def write_to_cert(
client: httpx.AsyncClient,
url: str,
cert: Path,
) -> None:
try:
r = await client.get(url)
r.raise_for_status()
except Exception:
log.error(f"Failed to write fetch: {url} returned {r.status_code}")
with cert.open("a", encoding="utf-8") as f:
f.write(f"{r.text}\n")
async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext:
CERT_FILE.unlink(missing_ok=True)
tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDLE_URLS]
await asyncio.gather(*tasks)
async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext:
if CERT_FILE.is_file():
mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime, TZ)
if now - mtime < timedelta(days=30):
return ssl.create_default_context(cafile=CERT_FILE)
log.info("Refreshing cached certificate")
await refresh_cert_cache(client)
return ssl.create_default_context(cafile=CERT_FILE)
async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO | None:
buffer = io.BytesIO()
try:
async with httpx.AsyncClient(
timeout=10,
verify=ssl_ctx,
follow_redirects=True,
) as client:
async with client.stream("GET", url) as r:
r.raise_for_status()
async for chunk in r.aiter_bytes(8192):
buffer.write(chunk)
buffer.seek(0)
return buffer
except Exception as e:
log.error(f"Failed to fetch {url}: {e}")
return
async def process_event(url: str, url_num: int) -> str | None:
async with async_playwright() as p:
browser, context = await new_browser(p, ignore_https_errors=True)
page = await context.new_page()
captured: list[str] = []
got_one = asyncio.Event()
handler = partial(capture_req, captured=captured, got_one=got_one)
popup = None
try:
await page.goto(
url,
wait_until="domcontentloaded",
timeout=10_000,
)
btn = await page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)")
if btn:
try:
await btn.click()
await page.wait_for_timeout(500)
except Exception as e:
log.debug(f"URL {url_num}) Failed to click Browser Links tab: {e}")
return
else:
log.warning(f"URL {url_num}) Browser Links tab not found")
return
link_img = await page.query_selector(
"tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img"
)
if not link_img:
log.warning(f"URL {url_num}) No browser link to click.")
return
page.on("request", handler)
try:
async with page.expect_popup(timeout=5_000) as popup_info:
try:
await link_img.click()
except Exception as e:
log.debug(
f"URL {url_num}) Click failed (popup might have already been opened): {e}"
)
popup = await popup_info.value
popup.on("request", handler)
except Exception:
try:
await link_img.click()
except Exception as e:
log.debug(f"URL {url_num}) Fallback click failed: {e}")
wait_task = asyncio.create_task(got_one.wait())
try:
await asyncio.wait_for(wait_task, timeout=15)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out waiting for M3U8.")
return
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
page.remove_listener("request", handler)
if popup:
popup.remove_listener("request", handler)
await popup.close()
await page.close()
if captured:
log.info(f"URL {url_num}) Captured M3U8")
return captured[-1]
log.warning(f"URL {url_num}) No M3U8 captured in popup or inline playback.")
return
except Exception:
try:
page.remove_listener("request", handler)
if popup:
popup.remove_listener("request", handler)
await popup.close()
await page.close()
except Exception:
pass
await browser.close()
async def get_events(
url: str,
ssl_ctx: ssl.SSLContext,
cached_keys: set[str],
) -> list[dict[str, str]]:
events: list[dict[str, str]] = []
start_dt = now - timedelta(minutes=30)
end_dt = now + timedelta(minutes=30)
if buffer := await fetch_xml_stream(url, ssl_ctx):
pub_date_format = "%a, %d %b %Y %H:%M:%S %z"
for _, elem in ET.iterparse(buffer, events=("end",)):
if elem.tag == "item":
title = elem.findtext("title")
desc = elem.findtext("description")
pub_date = elem.findtext("pubDate")
link = elem.findtext("link")
try:
dt = datetime.strptime(pub_date, pub_date_format)
dt = dt.astimezone(TZ)
except Exception:
elem.clear()
continue
if not start_dt <= dt <= end_dt:
elem.clear()
continue
sport, event = (
(
desc.split(".")[0].strip(),
" ".join(p.strip() for p in desc.split(".")[1:]),
)
if desc
else ("", "")
)
key = f"[{sport}: {event}] {title} (LTVSX)"
if cached_keys & {key}:
elem.clear()
continue
events.append(
{
"sport": sport,
"event": event,
"title": title,
"link": link,
}
)
elem.clear()
return events
async def scrape(client: httpx.AsyncClient) -> None:
cached_urls = load_cache(CACHE_FILE, exp=10_800)
cached_count = len(cached_urls)
urls.update(cached_urls)
log.info(f"Collected {cached_count} event(s) from cache")
log.info(f'Scraping from "{BASE_URL}"')
cert = await get_cert(client)
events = await get_events(
BASE_URL,
cert,
set(cached_urls.keys()),
)
log.info(f"Processing {len(events)} new URL(s)")
for i, ev in enumerate(events, start=1):
link = ev["link"]
url = await safe_process_event(
lambda: process_event(link, url_num=i),
url_num=i,
log=log,
)
if url:
sport = ev["sport"]
event = ev["event"]
title = ev["title"]
key = f"[{sport}: {event}] {title} (LTVSX)"
tvg_id, logo = leagues.info(event)
if not tvg_id:
tvg_id, logo = leagues.info(sport)
entry = {
"url": url,
"logo": logo,
"id": tvg_id or "Live.Event.us",
"base": "https://livetv.sx/enx/",
"timestamp": now.timestamp(),
}
urls[key] = cached_urls[key] = entry
if new_count := len(cached_urls) - cached_count:
log.info(f"Collected and cached {new_count} new event(s)")
else:
log.info("No new events found")
write_cache(CACHE_FILE, cached_urls)