This commit is contained in:
doms9 2025-10-15 10:53:54 -04:00
parent b110aee1e8
commit 00000d9ba6
11 changed files with 121 additions and 135 deletions

View file

@ -76,7 +76,7 @@ async def fetch_xml(url: str) -> ET.Element | None:
r = await client.get(url)
r.raise_for_status()
except Exception as e:
print(f'Failed to fetch "{url}"\n{e}')
print(f'Failed to fetch "{url}": {e}')
return
try:
@ -85,7 +85,7 @@ async def fetch_xml(url: str) -> ET.Element | None:
return ET.fromstring(decompressed_data)
except Exception as e:
print(f'Failed to decompress and parse XML from "{url}"\n{e}')
print(f'Failed to decompress and parse XML from "{url}": {e}')
def hijack_id(
@ -182,7 +182,7 @@ async def main() -> None:
tree.write(epg_file, encoding="utf-8", xml_declaration=True)
print(f"EPG saved to {epg_file.name}")
print(f"EPG saved to {epg_file.resolve()}")
if __name__ == "__main__":

View file

@ -83,7 +83,7 @@ async def main() -> None:
encoding="utf-8",
)
log.info(f"Base + Events saved to {COMBINED_FILE.name}")
log.info(f"Base + Events saved to {COMBINED_FILE.resolve()}")
EVENTS_FILE.write_text(
'#EXTM3U url-tvg="https://raw.githubusercontent.com/doms9/iptv/refs/heads/default/EPG/TV.xml"\n'
@ -91,7 +91,7 @@ async def main() -> None:
encoding="utf-8",
)
log.info(f"Events saved to {EVENTS_FILE.name}")
log.info(f"Events saved to {EVENTS_FILE.resolve()}")
if __name__ == "__main__":

View file

@ -1,3 +1,4 @@
from functools import partial
from pathlib import Path
from urllib.parse import unquote, urljoin
@ -20,14 +21,48 @@ MIRRORS = [
CACHE_FILE = Cache(Path(__file__).parent / "caches" / "fstv.json", exp=10_800)
async def process_event(
client: httpx.AsyncClient,
url: str,
url_num: int,
) -> tuple[str, str]:
try:
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'URL {url_num}) Failed to fetch "{url}": {e}')
return "", ""
soup = HTMLParser(r.text)
if category_links := soup.css(".common-list-category .category-item a"):
match_name = category_links[-1].text(strip=True)
else:
match_name = None
if not match_name or match_name.lower() == "vs":
if og_title := soup.css_first("meta[property='og:title']"):
match_name = (
og_title.attributes.get("content", "").split(" start on")[0].strip()
)
if not (ifr := soup.css_first("iframe")):
log.info(f"URL {url_num}) No M3U8 found")
return "", ""
if src := ifr.attributes.get("src"):
log.info(f"URL {url_num}) Captured M3U8")
return match_name or "", unquote(src).split("link=")[-1]
async def get_events(
client: httpx.AsyncClient,
base_url: str,
cached_hrefs: set[str],
) -> list[dict[str, str]]:
log.info(f'Scraping from "{base_url}"')
try:
r = await client.get(base_url)
r.raise_for_status()
@ -76,42 +111,6 @@ async def get_events(
return events
async def process_event(
client: httpx.AsyncClient,
url: str,
url_num: int,
) -> tuple[str, str]:
try:
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'URL {url_num}) Failed to fetch "{url}"\n{e}')
return "", ""
soup = HTMLParser(r.text)
if category_links := soup.css(".common-list-category .category-item a"):
match_name = category_links[-1].text(strip=True)
else:
match_name = None
if not match_name or match_name.lower() == "vs":
if og_title := soup.css_first("meta[property='og:title']"):
match_name = (
og_title.attributes.get("content", "").split(" start on")[0].strip()
)
if not (ifr := soup.css_first("iframe")):
log.info(f"URL {url_num}) No M3U8 found")
return "", ""
if src := ifr.attributes.get("src", ""):
log.info(f"URL {url_num}) Captured M3U8")
return match_name or "", unquote(src).split("link=")[-1]
async def scrape(client: httpx.AsyncClient) -> None:
cached_urls = CACHE_FILE.load()
cached_hrefs = {entry["href"] for entry in cached_urls.values()}
@ -125,6 +124,8 @@ async def scrape(client: httpx.AsyncClient) -> None:
CACHE_FILE.write(cached_urls)
return
log.info(f'Scraping from "{base_url}"')
events = await get_events(
client,
base_url,
@ -136,15 +137,9 @@ async def scrape(client: httpx.AsyncClient) -> None:
now = Time.now().timestamp()
for i, ev in enumerate(events, start=1):
match_name, url = await network.safe_process(
lambda: process_event(
client,
ev["link"],
url_num=i,
),
url_num=i,
log=log,
)
handler = partial(process_event, client=client, url=ev["link"], url_num=i)
match_name, url = await network.safe_process(handler, url_num=i, log=log)
if url:
sport = ev["sport"]

View file

@ -1,4 +1,5 @@
import re
from functools import partial
from pathlib import Path
from urllib.parse import urljoin
@ -26,7 +27,7 @@ async def process_event(
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'URL {url_num}) Failed to fetch "{url}"\n{e}')
log.error(f'URL {url_num}) Failed to fetch "{url}": {e}')
return
valid_m3u8 = re.compile(
@ -55,15 +56,15 @@ async def get_events(client: httpx.AsyncClient) -> list[dict[str, str]]:
events = []
for card in soup.css("div.container div.card"):
sport = card.css_first("h5.card-title").text(strip=True)
name = card.css_first("p.card-text").text(strip=True)
link = card.css_first("a.btn.btn-primary")
if not (href := link.attrs.get("href")):
continue
sport = card.css_first("h5.card-title").text(strip=True)
name = card.css_first("p.card-text").text(strip=True)
events.append(
{
"sport": sport,
@ -90,8 +91,10 @@ async def scrape(client: httpx.AsyncClient) -> None:
now = Time.now().timestamp()
for i, ev in enumerate(events, start=1):
handler = partial(process_event, client=client, url=ev["link"], url_num=i)
url = await network.safe_process(
lambda: process_event(client, url=ev["link"], url_num=i),
handler,
url_num=i,
log=log,
timeout=10,

View file

@ -95,7 +95,7 @@ async def get_events(
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'Failed to fetch "{url}"\n{e}')
log.error(f'Failed to fetch "{url}": {e}')
return []
@ -132,7 +132,7 @@ async def get_events(
time_text = time_span.text(strip=True)
timestamp = int(a.attributes.get("data-time", 31496400))
timestamp = int(a.attributes.get("data-time", Time.default_8()))
key = f"[{sport}] {name} (SEAST)"
@ -180,15 +180,9 @@ async def scrape(client: httpx.AsyncClient) -> None:
browser, context = await network.browser(p, browser="brave")
for i, ev in enumerate(events, start=1):
url = await network.safe_process(
lambda: process_event(
ev["link"],
url_num=i,
context=context,
),
url_num=i,
log=log,
)
handler = partial(process_event, url=ev["link"], url_num=i, context=context)
url = await network.safe_process(handler, url_num=i, log=log)
if url:
sport, event, ts = ev["sport"], ev["event"], ev["timestamp"]

View file

@ -78,7 +78,7 @@ async def refresh_html_cache(client: httpx.AsyncClient, url: str) -> dict[str, s
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'Failed to fetch "{url}"\n{e}')
log.error(f'Failed to fetch "{url}": {e}')
return []
@ -173,15 +173,9 @@ async def scrape(client: httpx.AsyncClient) -> None:
browser, context = await network.browser(p, browser="brave")
for i, ev in enumerate(events, start=1):
url = await network.safe_process(
lambda: process_event(
ev["link"],
url_num=i,
context=context,
),
url_num=i,
log=log,
)
handler = partial(process_event, url=ev["link"], url_num=i, context=context)
url = await network.safe_process(handler, url_num=i, log=log)
if url:
sport, event, ts = ev["sport"], ev["event"], ev["event_ts"]

View file

@ -28,7 +28,7 @@ def validate_category(s: str) -> str:
elif s == "fight":
return "Fight (UFC/Boxing)"
return s.capitalize()
return s.capitalize() if len(s) > 4 else s.upper()
async def refresh_api_cache(
@ -40,12 +40,12 @@ async def refresh_api_cache(
r = await client.get(url)
r.raise_for_status()
except Exception as e:
log.error(f'Failed to fetch "{url}"\n{e}')
log.error(f'Failed to fetch "{url}": {e}')
return {}
data = r.json()
data[0]["timestamp"] = Time.now().timestamp()
data[-1]["timestamp"] = Time.now().timestamp()
return data
@ -113,7 +113,7 @@ async def get_events(
cached_keys: set[str],
) -> list[dict[str, str]]:
if not (api_data := API_FILE.load(per_entry=False, index=True)):
if not (api_data := API_FILE.load(per_entry=False, index=-1)):
api_data = await refresh_api_cache(
client,
urljoin(
@ -211,15 +211,9 @@ async def scrape(client: httpx.AsyncClient) -> None:
browser, context = await network.browser(p, "brave")
for i, ev in enumerate(events, start=1):
url = await network.safe_process(
lambda: process_event(
ev["link"],
url_num=i,
context=context,
),
url_num=i,
log=log,
)
handler = partial(process_event, url=ev["link"], url_num=i, context=context)
url = await network.safe_process(handler, url_num=i, log=log)
if url:
sport, event, logo, ts = (

View file

@ -11,7 +11,7 @@ class Cache:
self.now_ts = Time.now().timestamp()
def is_fresh(self, entry: dict) -> bool:
ts: float | int = entry.get("timestamp", 31496400)
ts: float | int = entry.get("timestamp", Time.default_8())
dt_ts = Time.clean(Time.from_ts(ts)).timestamp()
@ -20,7 +20,7 @@ class Cache:
def load(
self,
per_entry: bool = True,
index: bool = False,
index: int | None = None,
) -> dict[str, dict[str, str | float]]:
try:
@ -32,10 +32,10 @@ class Cache:
return {k: v for k, v in data.items() if self.is_fresh(v)}
if index:
ts: float | int = data[0].get("timestamp", 31496400)
ts: float | int = data[index].get("timestamp", Time.default_8())
else:
ts: float | int = data.get("timestamp", 31496400)
ts: float | int = data.get("timestamp", Time.default_8())
dt_ts = Time.clean(Time.from_ts(ts)).timestamp()

View file

@ -21,6 +21,14 @@ class Time(datetime):
def from_ts(cls, ts: int | float) -> "Time":
return cls.fromtimestamp(ts, tz=cls.TZ)
@classmethod
def default_8(cls) -> float:
return (
cls.now()
.replace(hour=8, minute=0, second=0, microsecond=0, tzinfo=cls.TZ)
.timestamp()
)
def delta(self, **kwargs) -> "Time":
return self.from_ts((self + timedelta(**kwargs)).timestamp())
@ -66,7 +74,7 @@ class Time(datetime):
except ValueError:
continue
else:
return cls.from_ts(31496400)
return cls.from_ts(Time.default_8())
if not dt.tzinfo:
dt = tz.localize(dt) if hasattr(tz, "localize") else dt.replace(tzinfo=tz)
@ -75,13 +83,13 @@ class Time(datetime):
class Leagues:
live_img = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png"
def __init__(self) -> None:
self.data = json.loads(
(Path(__file__).parent / "leagues.json").read_text(encoding="utf-8")
)
self.live_img = "https://i.gyazo.com/978f2eb4a199ca5b56b447aded0cb9e3.png"
def teams(self, league: str) -> list[str]:
return self.data["teams"].get(league, [])
@ -145,16 +153,12 @@ class Leagues:
else:
return self.info("Basketball")
case "Hockey":
case "Ice Hockey" | "Hockey":
return self.info("NHL")
case _:
return self.info(sport)
@property
def league_names(self) -> list[str]:
return self.data["teams"].keys()
leagues = Leagues()

View file

@ -127,46 +127,54 @@ class Network:
await context.add_init_script(
"""
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, "webdriver", { get: () => undefined });
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en']
Object.defineProperty(navigator, "languages", {
get: () => ["en-US", "en"],
});
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4]
Object.defineProperty(navigator, "plugins", {
get: () => [1, 2, 3, 4],
});
const elementDescriptor = Object.getOwnPropertyDescriptor(HTMLElement.prototype, 'offsetHeight');
Object.defineProperty(HTMLDivElement.prototype, 'offsetHeight', {
const elementDescriptor = Object.getOwnPropertyDescriptor(
HTMLElement.prototype,
"offsetHeight"
);
Object.defineProperty(HTMLDivElement.prototype, "offsetHeight", {
...elementDescriptor,
get: function() {
if (this.id === 'modernizr') { return 24; }
return elementDescriptor.get.apply(this);
get: function () {
if (this.id === "modernizr") {
return 24;
}
return elementDescriptor.get.apply(this);
},
});
Object.defineProperty(window.screen, 'width', { get: () => 1366 });
Object.defineProperty(window.screen, 'height', { get: () => 768 });
Object.defineProperty(window.screen, "width", { get: () => 1366 });
Object.defineProperty(window.screen, "height", { get: () => 768 });
const getParameter = WebGLRenderingContext.prototype.getParameter;
const getParameter = WebGLRenderingContext.prototype. getParameter;
WebGLRenderingContext.prototype.getParameter = function (param) {
if (param === 37445) return "Intel Inc."; // UNMASKED_VENDOR_WEBGL
if (param === 37446) return "Intel Iris OpenGL Engine"; // UNMASKED_RENDERER_WEBGL
return getParameter.apply(this, [param]);
};
const observer = new MutationObserver(mutations => {
mutations.forEach(mutation => {
mutation.addedNodes.forEach(node => {
if (node.tagName === 'IFRAME' && node.hasAttribute('sandbox')) {
node.removeAttribute('sandbox');
const observer = new MutationObserver((mutations) => {
mutations.forEach((mutation) => {
mutation.addedNodes.forEach((node) => {
if (node.tagName === "IFRAME" && node.hasAttribute("sandbox")) {
node.removeAttribute("sandbox");
}
});
});
});
observer.observe(document.documentElement, { childList: true, subtree: true });
"""
)

View file

@ -68,7 +68,7 @@ async def refresh_api_cache(
for ev in data:
ev["ts"] = ev.pop("timestamp")
data[0]["timestamp"] = Time.now().timestamp()
data[-1]["timestamp"] = Time.now().timestamp()
return data
@ -152,7 +152,7 @@ async def get_events(
cached_keys: set[str],
) -> list[dict[str, str]]:
if not (api_data := API_FILE.load(per_entry=False, index=True)):
if not (api_data := API_FILE.load(per_entry=False, index=-1)):
api_data = await refresh_api_cache(client, base_url)
API_FILE.write(api_data)
@ -227,15 +227,9 @@ async def scrape(client: httpx.AsyncClient) -> None:
browser, context = await network.browser(p)
for i, ev in enumerate(events, start=1):
url = await network.safe_process(
lambda: process_event(
ev["link"],
url_num=i,
context=context,
),
url_num=i,
log=log,
)
handler = partial(process_event, url=ev["link"], url_num=i, context=context)
url = await network.safe_process(handler, url_num=i, log=log)
sport, event, logo, ts = (
ev["sport"],