This commit is contained in:
doms9 2025-09-02 18:06:35 -04:00
parent 7617aa4bc6
commit 00000d941c
6 changed files with 243 additions and 159 deletions

1
.gitignore vendored
View file

@ -12,3 +12,4 @@ wheels/
# Misc # Misc
.python-version .python-version
stuff/ stuff/
cached-ca.pem

View file

@ -7,11 +7,11 @@ from scrape import ace, fstv, livetvsx, logger, tvpass
log = logger.get_logger(__name__) log = logger.get_logger(__name__)
base_url = "https://s.id/ePwXT" BASE_URL = "https://s.id/ePwXT"
m3u8_file = Path(__file__).parent / "TV.m3u8" M3U8_FILE = Path(__file__).parent / "TV.m3u8"
client = httpx.AsyncClient( CLIENT = httpx.AsyncClient(
timeout=5, timeout=5,
follow_redirects=True, follow_redirects=True,
headers={ headers={
@ -24,10 +24,10 @@ async def vanilla_fetch() -> tuple[list[str], int]:
log.info("Fetching base M3U8") log.info("Fetching base M3U8")
try: try:
r = await client.get(base_url) r = await CLIENT.get(BASE_URL)
r.raise_for_status() r.raise_for_status()
except Exception as e: except Exception as e:
log.error(f'Failed to fetch "{base_url}"\n{e}') log.error(f'Failed to fetch "{BASE_URL}"\n{e}')
raise SystemExit(e) from e raise SystemExit(e) from e
d = r.text.splitlines()[1:] d = r.text.splitlines()[1:]
@ -41,8 +41,8 @@ async def main() -> None:
tasks = [ tasks = [
# ace.main(client), # ace.main(client),
# fstv.main(client), # fstv.main(client),
livetvsx.main(), livetvsx.main(CLIENT),
tvpass.main(client), tvpass.main(CLIENT),
] ]
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
@ -59,7 +59,7 @@ async def main() -> None:
) )
] ]
m3u8_file.write_text( M3U8_FILE.write_text(
'#EXTM3U url-tvg="https://raw.githubusercontent.com/doms9/iptv/refs/heads/default/EPG/TV.xml"\n' '#EXTM3U url-tvg="https://raw.githubusercontent.com/doms9/iptv/refs/heads/default/EPG/TV.xml"\n'
+ "\n".join(base_m3u8) + "\n".join(base_m3u8)
+ "\n" + "\n"
@ -68,7 +68,7 @@ async def main() -> None:
encoding="utf-8", encoding="utf-8",
) )
log.info(f"M3U8 saved to {m3u8_file.name}") log.info(f"M3U8 saved to {M3U8_FILE.name}")
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -13,7 +13,7 @@ log = get_logger(__name__)
urls: dict[str, dict[str, str]] = {} urls: dict[str, dict[str, str]] = {}
mirrors = [ MIRRORS = [
"https://aceztrims.pages.dev/", "https://aceztrims.pages.dev/",
"https://acestrlms.pages.dev/", "https://acestrlms.pages.dev/",
] ]
@ -97,7 +97,7 @@ async def get_m3u8_links(client: httpx.AsyncClient, url: str) -> list[str]:
async def main(client: httpx.AsyncClient) -> None: async def main(client: httpx.AsyncClient) -> None:
if not (base_url := await get_base(client, mirrors)): if not (base_url := await get_base(client, MIRRORS)):
log.warning("No working ace mirrors") log.warning("No working ace mirrors")
return return

View file

@ -11,7 +11,7 @@ log = get_logger(__name__)
urls: dict[str, dict[str, str]] = {} urls: dict[str, dict[str, str]] = {}
mirrors = [ MIRRORS = [
"https://fstv.online", "https://fstv.online",
"https://fstv.space", "https://fstv.space",
"https://fstv.zip", "https://fstv.zip",
@ -103,7 +103,7 @@ async def fetch_m3u8(client: httpx.AsyncClient, url: str) -> tuple[str, list[str
async def main(client: httpx.AsyncClient) -> None: async def main(client: httpx.AsyncClient) -> None:
if not (base_url := await get_base(client, mirrors)): if not (base_url := await get_base(client, MIRRORS)):
log.warning("No working FSTV mirrors") log.warning("No working FSTV mirrors")
return return

View file

@ -1,20 +1,34 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import io
import ssl
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import urljoin
from playwright.async_api import BrowserContext, Request, async_playwright import httpx
from playwright.async_api import Request, async_playwright
from .logger import get_logger from .logger import get_logger
from .tvpass import logos from .tvpass import TZ, logos
base_url = "https://livetv.sx/enx/"
log = get_logger(__name__) log = get_logger(__name__)
urls: dict[str, str] = {}
tvp_sports = set(logos.keys()) tvp_sports = set(logos.keys())
urls: dict[str, str] = {} BASE_URL = "https://cdn.livetv861.me/rss/upcoming_en.xml"
CERT_BUNDL_URLS = [
"https://curl.se/ca/cacert.pem",
"https://ssl.com/repo/certs/Cloudflare-TLS-I-E1.pem",
"https://ssl.com/repo/certs/SSL.com-TLS-T-ECC-R2.pem",
"https://ssl.com/repo/certs/Sectigo-AAA-Root.pem",
]
CERT_FILE = Path(__file__).parent / "cached-ca.pem"
async def safe_process_event(fn, timeout_sec=20) -> Any | None: async def safe_process_event(fn, timeout_sec=20) -> Any | None:
@ -24,108 +38,196 @@ async def safe_process_event(fn, timeout_sec=20) -> Any | None:
log.warning(f"Timed out after {timeout_sec}s, skipping event") log.warning(f"Timed out after {timeout_sec}s, skipping event")
async def process_event( async def write_to_cert(client: httpx.AsyncClient, url: str, cert: Path) -> None:
ev: dict[str, str], try:
context: BrowserContext, r = await client.get(url)
max_wait_ms=15_000, r.raise_for_status()
) -> str | None: except Exception:
ev_page = await context.new_page() log.error(f"Failed to write fetch: {url} returned {r.status_code}")
captured: list[str] = [] with cert.open("a", encoding="utf-8") as f:
f.write(f"{r.text}\n")
got_one = asyncio.Event()
def capture_req(req: Request) -> None: async def refresh_cert_cache(client: httpx.AsyncClient) -> ssl.SSLContext:
if ( CERT_FILE.unlink(missing_ok=True)
".m3u8" in req.url
and "amazonaws" not in req.url
and "knitcdn" not in req.url
and not captured
):
captured.append(req.url)
got_one.set() tasks = [write_to_cert(client, url, CERT_FILE) for url in CERT_BUNDL_URLS]
popup = None await asyncio.gather(*tasks)
async def get_cert(client: httpx.AsyncClient) -> ssl.SSLContext:
if CERT_FILE.is_file():
mtime = datetime.fromtimestamp(CERT_FILE.stat().st_mtime)
if datetime.now() - mtime < timedelta(days=30):
return ssl.create_default_context(cafile=CERT_FILE)
log.info("Refreshing cached certificate")
await refresh_cert_cache(client)
return ssl.create_default_context(cafile=CERT_FILE)
async def fetch_xml_stream(url: str, ssl_ctx: ssl.SSLContext) -> io.BytesIO:
buffer = io.BytesIO()
try: try:
await ev_page.goto(ev["href"], wait_until="domcontentloaded", timeout=30_000) async with httpx.AsyncClient(timeout=10, verify=ssl_ctx) as client:
async with client.stream("GET", url) as r:
r.raise_for_status()
btn = await ev_page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)") async for chunk in r.aiter_bytes(8192):
buffer.write(chunk)
buffer.seek(0)
return buffer
except Exception as e:
log.error(f"Failed to fetch {url}: {e}")
return io.BytesIO(b"")
async def parse_feed(url: str, ssl_ctx: ssl.SSLContext) -> dict[str, dict[str, str]]:
events = []
pub_date_format = "%a, %d %b %Y %H:%M:%S %z"
now = datetime.now(TZ)
window_start, window_end = now - timedelta(hours=3), now + timedelta(hours=1)
buffer = await fetch_xml_stream(url, ssl_ctx)
for _, elem in ET.iterparse(buffer, events=("end",)):
if elem.tag == "item":
title = elem.findtext("title")
desc = elem.findtext("description")
pub_date = elem.findtext("pubDate")
link = elem.findtext("link")
if btn:
try: try:
await btn.click() dt = datetime.strptime(pub_date, pub_date_format)
dt = dt.astimezone(TZ)
except Exception:
elem.clear()
continue
await ev_page.wait_for_timeout(500) if window_start <= dt <= window_end:
except Exception as e: sport, event = (
log.debug(f"Failed to click Browser Links tab: {e}") (
else: desc.split(".")[0].strip(),
log.warning("Browser Links tab not found") " ".join(p.strip() for p in desc.split(".")[1:]),
)
if desc
else ("", "")
)
link_img = await ev_page.query_selector( events.append(
"tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img" {
"sport": sport,
"event": event,
"title": title,
"link": link,
}
)
elem.clear()
return events
async def process_event(url: str, max_wait_ms=15_000) -> str | None:
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
context = await browser.new_context(
ignore_https_errors=True # website doesn't send valid certs
) )
ev_page = await context.new_page()
if not link_img: captured: list[str] = []
log.warning("No browser link to click.")
ev_page.on("request", capture_req) got_one = asyncio.Event()
def capture_req(req: Request) -> None:
if (
".m3u8" in req.url
and "amazonaws" not in req.url
and "knitcdn" not in req.url
and not captured
):
captured.append(req.url)
got_one.set()
popup = None
try: try:
async with ev_page.expect_popup(timeout=5_000) as popup_info: await ev_page.goto(
url,
wait_until="domcontentloaded",
timeout=30_000,
)
btn = await ev_page.query_selector(".lnkhdr > tbody > tr > td:nth-child(2)")
if btn:
try:
await btn.click()
await ev_page.wait_for_timeout(500)
except Exception as e:
log.debug(f"Failed to click Browser Links tab: {e}")
else:
log.warning("Browser Links tab not found")
link_img = await ev_page.query_selector(
"tr:nth-child(2) > td:nth-child(1) td:nth-child(6) img"
)
if not link_img:
log.warning("No browser link to click.")
ev_page.on("request", capture_req)
try:
async with ev_page.expect_popup(timeout=5_000) as popup_info:
try:
await link_img.click()
except Exception as e:
log.debug(
f"Click failed (popup might have already been opened): {e}"
)
popup = await popup_info.value
popup.on("request", capture_req)
except Exception:
try: try:
await link_img.click() await link_img.click()
except Exception as e: except Exception as e:
log.debug( log.debug(f"Fallback click failed: {e}")
f"Click failed (popup might have already been opened): {e}"
)
popup = await popup_info.value wait_task = asyncio.create_task(got_one.wait())
popup.on("request", capture_req)
except Exception:
try: try:
await link_img.click() await asyncio.wait_for(wait_task, timeout=max_wait_ms / 1000)
except Exception as e:
log.debug(f"Fallback click failed: {e}")
wait_task = asyncio.create_task(got_one.wait()) except asyncio.TimeoutError:
log.warning("Timed out waiting for m3u8.")
try: finally:
await asyncio.wait_for(wait_task, timeout=max_wait_ms / 1000) if not wait_task.done():
wait_task.cancel()
except asyncio.TimeoutError: try:
log.warning("Timed out waiting for m3u8.") await wait_task
except asyncio.CancelledError:
pass
finally:
if not wait_task.done():
wait_task.cancel()
try:
await wait_task
except asyncio.CancelledError:
pass
ev_page.remove_listener("request", capture_req)
if popup:
popup.remove_listener("request", capture_req)
await popup.close()
await ev_page.close()
if captured:
return captured[-1]
log.warning("No m3u8 captured in popup or inline playback.")
except Exception as e:
log.error(f"Error processing {ev['name']}: {e}")
try:
ev_page.remove_listener("request", capture_req) ev_page.remove_listener("request", capture_req)
if popup: if popup:
@ -134,75 +236,56 @@ async def process_event(
await popup.close() await popup.close()
await ev_page.close() await ev_page.close()
except Exception:
pass
if captured:
return captured[-1]
async def main() -> None: log.warning("No m3u8 captured in popup or inline playback.")
log.info(f'Scraping from "{base_url}"')
async with async_playwright() as p: except Exception as e:
browser = await p.firefox.launch(headless=True) try:
ev_page.remove_listener("request", capture_req)
context = await browser.new_context( if popup:
ignore_https_errors=True # website doesn't send valid certs popup.remove_listener("request", capture_req)
)
page = await context.new_page() await popup.close()
await page.goto(base_url, wait_until="domcontentloaded", timeout=60_000) await ev_page.close()
except Exception:
rows = await page.query_selector_all("#upcoming table tr") pass
events = []
seen_hrefs = set()
for row in rows:
img = await row.query_selector("img")
league = (await img.get_attribute("alt") or "").strip() if img else ""
live_anchor = None
for a in await row.query_selector_all("a.live"):
txt = (await a.text_content() or "").strip()
if txt:
live_anchor = a
break
if live_anchor:
href = await live_anchor.get_attribute("href")
full_url = urljoin(base_url, href)
if full_url in seen_hrefs:
continue
seen_hrefs.add(full_url)
text = (await live_anchor.text_content() or "").strip()
events.append({"name": text, "href": full_url, "league": league})
for ev in events:
if (
sport := ev["league"].split(".")[-1].strip()
) in tvp_sports: # already in tvpass
continue
url = await safe_process_event(lambda: process_event(ev, context))
if url:
urls[f"[{sport}] {ev['name']}"] = {
"url": url,
"logo": logos.get(
sport,
"https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png",
),
}
await browser.close() await browser.close()
log.info(f"Collected {len(urls)} live events")
async def main(client: httpx.AsyncClient) -> None:
log.info(f'Scraping from "{BASE_URL}"')
cert = await get_cert(client)
events = await parse_feed(BASE_URL, cert)
log.info(f"Processing {len(events)} events")
for ev in events:
if tvp_sports & {
sport := ev["sport"],
event := ev["event"],
}: # already in tvpass
continue
url = await safe_process_event(lambda: process_event(ev["link"]))
if url:
urls[f"[{sport}: {event}] {ev['title']}"] = {
"url": url,
"logo": logos.get(
sport,
"https://i.gyazo.com/ec27417a9644ae517196494afa72d2b9.png",
),
}
log.info(f"Collected {len(urls)} live events")
# add caching

View file

@ -26,10 +26,10 @@ logos = {
"WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png", "WNBA": "https://i.gyazo.com/02d665a5704118d195dbcd5fa20d5462.png",
} }
TZ = pytz.timezone("America/New_York")
def load_cache() -> dict[str, str]: def load_cache() -> dict[str, str]:
TZ = pytz.timezone("America/New_York")
try: try:
data = json.loads(base_file.read_text(encoding="utf-8")) data = json.loads(base_file.read_text(encoding="utf-8"))