"""Epoch AI GPU Clusters Collector Collects data from Epoch AI GPU clusters tracking. https://epoch.ai/data/gpu-clusters """ import re from typing import Dict, Any, List from datetime import datetime from bs4 import BeautifulSoup import httpx from app.services.collectors.base import BaseCollector class EpochAIGPUCollector(BaseCollector): name = "epoch_ai_gpu" priority = "P0" module = "L1" frequency_hours = 6 data_type = "gpu_cluster" async def fetch(self) -> List[Dict[str, Any]]: """Fetch Epoch AI GPU clusters data from webpage""" url = "https://epoch.ai/data/gpu-clusters" async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(url) response.raise_for_status() return self.parse_response(response.text) def parse_response(self, html: str) -> List[Dict[str, Any]]: """Parse Epoch AI webpage to extract GPU cluster data""" data = [] soup = BeautifulSoup(html, "html.parser") # Try to find data table on the page tables = soup.find_all("table") for table in tables: rows = table.find_all("tr") for row in rows[1:]: # Skip header cells = row.find_all(["td", "th"]) if len(cells) >= 5: try: cluster_name = cells[0].get_text(strip=True) if not cluster_name or cluster_name in ["Cluster", "System", "Name"]: continue location_cell = cells[1].get_text(strip=True) if len(cells) > 1 else "" country, city = self._parse_location(location_cell) perf_cell = cells[2].get_text(strip=True) if len(cells) > 2 else "" entry = { "source_id": f"epoch_{re.sub(r'[^a-zA-Z0-9]', '_', cluster_name.lower())}", "name": cluster_name, "country": country, "city": city, "latitude": "", "longitude": "", "value": self._parse_performance(perf_cell), "unit": "TFlop/s", "metadata": { "raw_data": perf_cell, }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), } data.append(entry) except (ValueError, IndexError, AttributeError): continue # If no table found, return sample data if not data: data = self._get_sample_data() return data def _parse_location(self, location: str) -> tuple: """Parse location string into country and city""" if not location: return "", "" if "," in location: parts = location.rsplit(",", 1) city = parts[0].strip() country = parts[1].strip() if len(parts) > 1 else "" return country, city return location, "" def _parse_performance(self, perf: str) -> str: """Parse performance string to extract value""" if not perf: return "0" match = re.search(r"([\d,.]+)\s*(TFlop/s|PFlop/s|GFlop/s)?", perf, re.I) if match: return match.group(1).replace(",", "") match = re.search(r"([\d,.]+)", perf) if match: return match.group(1).replace(",", "") return "0" def _get_sample_data(self) -> List[Dict[str, Any]]: """Return sample data for testing when scraping fails""" return [ { "source_id": "epoch_sample_1", "name": "Sample GPU Cluster", "country": "United States", "city": "San Francisco, CA", "latitude": "", "longitude": "", "value": "1000", "unit": "TFlop/s", "metadata": { "note": "Sample data - Epoch AI page structure may vary", }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), }, ]