add semaphores to scrapers
(maybe) fix hanging on watchfooty
misc. edits
This commit is contained in:
doms9 2025-12-23 03:17:48 -05:00
parent 6e9729bf8c
commit 00000d920a
20 changed files with 103 additions and 73 deletions

View file

@ -22,7 +22,7 @@ epg_urls = [
] ]
client = httpx.AsyncClient( client = httpx.AsyncClient(
timeout=5, timeout=httpx.Timeout(5.0),
follow_redirects=True, follow_redirects=True,
http2=True, http2=True,
headers={ headers={

View file

@ -64,7 +64,7 @@ async def main() -> None:
asyncio.create_task(streamsgate.scrape()), asyncio.create_task(streamsgate.scrape()),
asyncio.create_task(strmd.scrape()), asyncio.create_task(strmd.scrape()),
asyncio.create_task(tvpass.scrape()), asyncio.create_task(tvpass.scrape()),
# asyncio.create_task(watchfooty.scrape()), asyncio.create_task(watchfooty.scrape()),
asyncio.create_task(webcast.scrape()), asyncio.create_task(webcast.scrape()),
] ]

View file

@ -101,6 +101,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -114,8 +114,8 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.HTTP_S,
log=log, log=log,
timeout=10,
) )
if url: if url:

View file

@ -131,8 +131,8 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.HTTP_S,
log=log, log=log,
timeout=10,
) )
if url: if url:

View file

@ -1,6 +1,7 @@
import json import json
from functools import partial
from playwright.async_api import async_playwright from playwright.async_api import BrowserContext, async_playwright
from .utils import Cache, Time, get_logger, leagues, network from .utils import Cache, Time, get_logger, leagues, network
@ -15,36 +16,29 @@ CACHE_FILE = Cache(f"{TAG.lower()}.json", exp=19_800)
BASE_URL = "https://pixelsport.tv/backend/livetv/events" BASE_URL = "https://pixelsport.tv/backend/livetv/events"
async def get_api_data() -> dict[str, list[dict, str, str]]: async def get_api_data(context: BrowserContext) -> dict[str, list[dict, str, str]]:
async with async_playwright() as p: try:
try: page = await context.new_page()
browser, context = await network.browser(p)
page = await context.new_page() await page.goto(
BASE_URL,
wait_until="domcontentloaded",
timeout=10_000,
)
await page.goto( raw_json = await page.locator("pre").inner_text(timeout=5_000)
BASE_URL, except Exception as e:
wait_until="domcontentloaded", log.error(f'Failed to fetch "{BASE_URL}": {e}')
timeout=10_000,
)
raw_json = await page.locator("pre").inner_text(timeout=5_000) return {}
except Exception as e:
log.error(f'Failed to fetch "{BASE_URL}": {e}')
return {}
finally:
await browser.close()
return json.loads(raw_json) return json.loads(raw_json)
async def get_events() -> dict[str, dict[str, str | float]]: async def get_events(context: BrowserContext) -> dict[str, dict[str, str | float]]:
now = Time.clean(Time.now()) now = Time.clean(Time.now())
api_data = await get_api_data() api_data = await get_api_data(context)
events = {} events = {}
@ -91,9 +85,21 @@ async def scrape() -> None:
log.info(f'Scraping from "{BASE_URL}"') log.info(f'Scraping from "{BASE_URL}"')
events = await get_events() async with async_playwright() as p:
browser, context = await network.browser(p)
urls.update(events) handler = partial(get_events, context=context)
events = await network.safe_process(
handler,
url_num=1,
semaphore=network.PW_S,
log=log,
)
await browser.close()
urls.update(events or {})
CACHE_FILE.write(urls) CACHE_FILE.write(urls)

View file

@ -123,6 +123,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -159,6 +159,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.HTTP_S,
log=log, log=log,
) )

View file

@ -137,6 +137,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.HTTP_S,
log=log, log=log,
) )

View file

@ -121,6 +121,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -66,9 +66,7 @@ async def get_events() -> list[dict[str, str]]:
): ):
continue continue
league = league_elem.text(strip=True) league, name = league_elem.text(strip=True), event_elem.text(strip=True)
name = event_elem.text(strip=True)
events.append( events.append(
{ {
@ -108,8 +106,8 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.HTTP_S,
log=log, log=log,
timeout=10,
) )
if url: if url:

View file

@ -119,6 +119,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -77,7 +77,14 @@ async def scrape() -> None:
log.info(f'Scraping from "{BASE_URL}"') log.info(f'Scraping from "{BASE_URL}"')
urls.update(await get_events()) events = await network.safe_process(
get_events,
url_num=1,
semaphore=network.HTTP_S,
log=log,
)
urls.update(events or {})
CACHE_FILE.write(urls) CACHE_FILE.write(urls)

View file

@ -164,6 +164,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -151,6 +151,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -151,6 +151,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -66,7 +66,14 @@ async def scrape() -> None:
log.info(f'Scraping from "{BASE_URL}"') log.info(f'Scraping from "{BASE_URL}"')
urls.update(await get_events()) events = await network.safe_process(
get_events,
url_num=1,
semaphore=network.HTTP_S,
log=log,
)
urls.update(events or {})
CACHE_FILE.write(urls) CACHE_FILE.write(urls)

View file

@ -24,11 +24,15 @@ class Network:
"Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0" "Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
) )
HTTP_S = asyncio.Semaphore(10)
PW_S = asyncio.Semaphore(3)
proxy_base = "https://stream.nvrmind.xyz" proxy_base = "https://stream.nvrmind.xyz"
def __init__(self) -> None: def __init__(self) -> None:
self.client = httpx.AsyncClient( self.client = httpx.AsyncClient(
timeout=5, timeout=httpx.Timeout(5.0),
follow_redirects=True, follow_redirects=True,
headers={"User-Agent": Network.UA}, headers={"User-Agent": Network.UA},
http2=True, http2=True,
@ -85,34 +89,39 @@ class Network:
async def safe_process( async def safe_process(
fn: Callable[[], Awaitable[T]], fn: Callable[[], Awaitable[T]],
url_num: int, url_num: int,
timeout: int | float = 15, semaphore: asyncio.Semaphore,
timeout: int | float = 10,
log: logging.Logger | None = None, log: logging.Logger | None = None,
) -> T | None: ) -> T | None:
log = log or logger log = log or logger
task = asyncio.create_task(fn()) async with semaphore:
task = asyncio.create_task(fn())
try:
return await asyncio.wait_for(task, timeout=timeout)
except asyncio.TimeoutError:
log.warning(f"URL {url_num}) Timed out after {timeout}s, skipping event")
task.cancel()
try: try:
await task return await asyncio.wait_for(task, timeout=timeout)
except asyncio.CancelledError:
pass
except asyncio.TimeoutError:
log.warning(
f"URL {url_num}) Timed out after {timeout}s, skipping event"
)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}")
return
except Exception as e: except Exception as e:
log.debug(f"URL {url_num}) Ignore exception after timeout: {e}") log.error(f"URL {url_num}) Unexpected error: {e}")
return return
except Exception as e:
log.error(f"URL {url_num}) Unexpected error: {e}")
return
@staticmethod @staticmethod
def capture_req( def capture_req(

View file

@ -78,12 +78,12 @@ async def process_event(
pattern = re.compile(r"\((\d+)\)") pattern = re.compile(r"\((\d+)\)")
page = await context.new_page()
captured: list[str] = [] captured: list[str] = []
got_one = asyncio.Event() got_one = asyncio.Event()
page = await context.new_page()
handler = partial( handler = partial(
network.capture_req, network.capture_req,
captured=captured, captured=captured,
@ -102,10 +102,7 @@ async def process_event(
await page.wait_for_timeout(2_000) await page.wait_for_timeout(2_000)
try: try:
header = await page.wait_for_selector( header = await page.wait_for_selector("text=/Stream Links/i", timeout=5_000)
"text=/Stream Links/i",
timeout=5_000,
)
text = await header.inner_text() text = await header.inner_text()
except TimeoutError: except TimeoutError:
@ -120,8 +117,7 @@ async def process_event(
try: try:
first_available = await page.wait_for_selector( first_available = await page.wait_for_selector(
'a[href*="/stream/"]', 'a[href*="/stream/"]', timeout=3_000
timeout=3_000,
) )
except TimeoutError: except TimeoutError:
log.warning(f"URL {url_num}) No available stream links.") log.warning(f"URL {url_num}) No available stream links.")
@ -133,22 +129,18 @@ async def process_event(
return None, None return None, None
embed = re.sub(
pattern=r"^.*\/stream",
repl="https://spiderembed.top/embed",
string=href,
)
await page.goto( await page.goto(
href, embed,
wait_until="domcontentloaded", wait_until="domcontentloaded",
timeout=5_000, timeout=5_000,
) )
if not (iframe := await page.query_selector("iframe")):
log.warning(f"URL {url_num}) No iframe found.")
return None, None
if not (iframe_src := await iframe.get_attribute("src")):
log.warning(f"URL {url_num}) No iframe source found.")
return None, None
wait_task = asyncio.create_task(got_one.wait()) wait_task = asyncio.create_task(got_one.wait())
try: try:
@ -170,7 +162,7 @@ async def process_event(
if captured: if captured:
log.info(f"URL {url_num}) Captured M3U8") log.info(f"URL {url_num}) Captured M3U8")
return captured[-1], iframe_src return captured[0], embed
log.warning(f"URL {url_num}) No M3U8 captured after waiting.") log.warning(f"URL {url_num}) No M3U8 captured after waiting.")
@ -282,6 +274,7 @@ async def scrape() -> None:
url, iframe = await network.safe_process( url, iframe = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )

View file

@ -141,6 +141,7 @@ async def scrape() -> None:
url = await network.safe_process( url = await network.safe_process(
handler, handler,
url_num=i, url_num=i,
semaphore=network.PW_S,
log=log, log=log,
) )