fix tvpass.py scraping
This commit is contained in:
doms9 2026-02-07 17:17:19 -05:00
parent c1a3e4ba68
commit 00000d991e
4 changed files with 119 additions and 59 deletions

View file

@ -71,6 +71,7 @@ async def main() -> None:
asyncio.create_task(streamhub.scrape(xtrnl_brwsr)),
asyncio.create_task(streamsgate.scrape(xtrnl_brwsr)),
asyncio.create_task(totalsportek.scrape(hdl_brwsr)),
asyncio.create_task(tvpass.scrape(hdl_brwsr)),
asyncio.create_task(webcast.scrape(hdl_brwsr)),
]
@ -80,7 +81,6 @@ async def main() -> None:
asyncio.create_task(pawa.scrape()),
asyncio.create_task(shark.scrape()),
asyncio.create_task(streambtw.scrape()),
# asyncio.create_task(tvpass.scrape()),
asyncio.create_task(xstreameast.scrape()),
]

View file

@ -1,4 +1,8 @@
import re
from functools import partial
from urllib.parse import urljoin
from playwright.async_api import Browser
from selectolax.parser import HTMLParser
from .utils import Cache, Time, get_logger, leagues, network
@ -8,73 +12,144 @@ urls: dict[str, dict[str, str | float]] = {}
TAG = "TVPASS"
CACHE_FILE = Cache(TAG, exp=86_400)
CACHE_FILE = Cache(TAG, exp=10_800)
BASE_URL = "https://tvpass.org/playlist/m3u"
HTML_CACHE = Cache(f"{TAG}-html", exp=28_800)
BASE_URL = "https://thetvapp.to"
async def get_events() -> dict[str, dict[str, str | float]]:
async def refresh_html_cache(now_ts: float) -> dict[str, dict[str, str | float]]:
log.info("Refreshing HTML cache")
events = {}
if not (r := await network.request(BASE_URL, log=log)):
if not (html_data := await network.request(BASE_URL, log=log)):
return events
now = Time.clean(Time.now())
soup = HTMLParser(html_data.content)
data = r.text.splitlines()
for row in soup.css(".row"):
if not (h3_elem := row.css_first("h3")):
continue
for i, line in enumerate(data, start=1):
if line.startswith("#EXTINF"):
tvg_id_match = re.search(r'tvg-id="([^"]*)"', line)
sport = h3_elem.text(strip=True)
tvg_name_match = re.search(r'tvg-name="([^"]*)"', line)
if sport.lower() == "live tv channels":
continue
group_title_match = re.search(r'group-title="([^"]*)"', line)
for a in row.css("a.list-group-item[href]"):
if not (href := a.attributes.get("href")):
continue
tvg = tvg_id_match[1] if tvg_id_match else None
if not (span := a.css_first("span")):
continue
if not tvg and (url := data[i]).endswith("/sd"):
if tvg_name := tvg_name_match[1]:
sport = group_title_match[1].upper().strip()
event_time = span.text(strip=True)
event = "(".join(tvg_name.split("(")[:-1]).strip()
event_dt = Time.from_str(event_time, timezone="UTC")
key = f"[{sport}] {event} ({TAG})"
event_name = a.text(strip=True).split(":")[0]
channel = url.split("/")[-2]
key = f"[{sport}] {event_name} ({TAG})"
tvg_id, logo = leagues.info(sport)
events[key] = {
"url": f"http://origin.thetvapp.to/hls/{channel}/mono.m3u8",
"logo": logo,
"id": tvg_id or "Live.Event.us",
"base": "https://tvpass.org",
"timestamp": now.timestamp(),
}
events[key] = {
"sport": sport,
"event": event_name,
"link": urljoin(BASE_URL, href),
"event_ts": event_dt.timestamp(),
"timestamp": now_ts,
}
return events
async def scrape() -> None:
if cached := CACHE_FILE.load():
urls.update(cached)
async def get_events(cached_keys: list[str]) -> list[dict[str, str]]:
now = Time.clean(Time.now())
log.info(f"Loaded {len(urls)} event(s) from cache")
if not (events := HTML_CACHE.load()):
events = await refresh_html_cache(now.timestamp())
return
HTML_CACHE.write(events)
live = []
start_ts = now.delta(minutes=-30).timestamp()
end_ts = now.delta(minutes=30).timestamp()
for k, v in events.items():
if k in cached_keys:
continue
if not start_ts <= v["event_ts"] <= end_ts:
continue
live.append({**v})
return live
async def scrape(browser: Browser) -> None:
cached_urls = CACHE_FILE.load()
cached_count = len(cached_urls)
urls.update(cached_urls)
log.info(f"Loaded {cached_count} event(s) from cache")
log.info(f'Scraping from "{BASE_URL}"')
events = await network.safe_process(
get_events,
url_num=1,
semaphore=network.HTTP_S,
log=log,
)
events = await get_events(cached_urls.keys())
urls.update(events or {})
log.info(f"Processing {len(events)} new URL(s)")
CACHE_FILE.write(urls)
if events:
async with network.event_context(browser) as context:
for i, ev in enumerate(events, start=1):
async with network.event_page(context) as page:
handler = partial(
network.process_event,
url=ev["link"],
url_num=i,
page=page,
log=log,
)
log.info(f"Collected and cached {len(urls)} new event(s)")
url = await network.safe_process(
handler,
url_num=i,
semaphore=network.PW_S,
log=log,
)
if url:
sport, event, ts, link = (
ev["sport"],
ev["event"],
ev["event_ts"],
ev["link"],
)
key = f"[{sport}] {event} ({TAG})"
tvg_id, logo = leagues.get_tvg_info(sport, event)
entry = {
"url": url,
"logo": logo,
"base": BASE_URL,
"timestamp": ts,
"id": tvg_id or "Live.Event.us",
"link": link,
}
urls[key] = cached_urls[key] = entry
if new_count := len(cached_urls) - cached_count:
log.info(f"Collected and cached {new_count} new event(s)")
else:
log.info("No new events found")
CACHE_FILE.write(cached_urls)

View file

@ -100,6 +100,7 @@ class Time(datetime):
"%Y-%m-%d %I:%M %p",
"%Y-%m-%d %H:%M %p",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y/%m/%d %H:%M",
"%Y/%m/%d %H:%M:%S",

View file

@ -6,7 +6,6 @@ from collections.abc import Awaitable, Callable
from contextlib import asynccontextmanager
from functools import partial
from typing import AsyncGenerator, TypeVar
from urllib.parse import urlencode, urljoin
import httpx
from playwright.async_api import Browser, BrowserContext, Page, Playwright, Request
@ -39,21 +38,6 @@ class Network:
http2=True,
)
@staticmethod
def build_proxy_url(
tag: str,
path: str,
query: dict | None = None,
) -> str:
tag = tag.lower()
return (
f"{urljoin(network.proxy_base, f'{tag}/{path}')}?{urlencode(query)}"
if query
else urljoin(network.proxy_base, f"{tag}/{path}")
)
async def request(
self,
url: str,