This commit is contained in:
doms9 2025-10-01 22:28:01 -04:00
parent f99bf34a84
commit 00000d98e0
4 changed files with 14 additions and 22 deletions

View file

@ -126,7 +126,7 @@ async def get_events(
base_url: str,
cached_keys: set[str],
) -> list[dict[str, str]]:
if not (api_data := API_FILE.load(nearest_hr=True, per_entry=False)):
if not (api_data := API_FILE.load(per_entry=False)):
api_data = await refresh_api_cache(client, urljoin(base_url, "api/streams"))
API_FILE.write(api_data)

View file

@ -129,7 +129,7 @@ async def get_events(
cached_keys: set[str],
) -> list[dict[str, str]]:
if not (events := HTML_CACHE.load(nearest_hr=True)):
if not (events := HTML_CACHE.load()):
events = await refresh_html_cache(client, url)
HTML_CACHE.write(events)

View file

@ -26,7 +26,7 @@ async def fetch_m3u8(client: httpx.AsyncClient) -> list[str]:
async def scrape(client: httpx.AsyncClient) -> None:
if cached := CACHE_FILE.load(nearest_hr=True):
if cached := CACHE_FILE.load():
urls.update(cached)
log.info(f"Loaded {len(urls)} event(s) from cache")
return

View file

@ -6,45 +6,37 @@ from .config import Time
class Cache:
def __init__(self, file: Path, exp: int | float) -> None:
self.file = file
self.exp = exp
self.now_ts = Time.now().timestamp()
@staticmethod
def near_hr(dt: datetime) -> float:
return dt.replace(minute=0, second=0, microsecond=0).timestamp()
def clean(dt: datetime) -> float:
return dt.replace(second=0, microsecond=0).timestamp()
def is_fresh(
self,
entry: dict,
nearest_hr: bool,
) -> bool:
def is_fresh(self, entry: dict) -> bool:
ts: float | int = entry.get("timestamp", 31496400)
if nearest_hr:
ts = self.near_hr(Time.from_ts(ts))
dt_ts = self.clean(Time.from_ts(ts))
return Time.now().timestamp() - ts < self.exp
return self.now_ts - dt_ts < self.exp
def load(
self,
nearest_hr: bool = False,
per_entry: bool = True,
) -> dict[str, dict[str, str | float]]:
def load(self, per_entry: bool = True) -> dict[str, dict[str, str | float]]:
try:
data: dict = json.loads(self.file.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return {}
if per_entry:
return {k: v for k, v in data.items() if self.is_fresh(v, nearest_hr)}
return {k: v for k, v in data.items() if self.is_fresh(v)}
ts: float | int = data.get("timestamp", 31496400)
if nearest_hr:
ts = self.near_hr(Time.from_ts(ts))
dt_ts = self.clean(Time.from_ts(ts))
return data if self.is_fresh({"timestamp": ts}, False) else {}
return data if self.is_fresh({"timestamp": dt_ts}) else {}
def write(self, data: dict) -> None:
self.file.parent.mkdir(parents=True, exist_ok=True)