- Add data_sources.yaml for configurable data source URLs - Add data_sources.py to load config with database override support - Add arcgis_landing_points and arcgis_cable_landing_relation collectors - Change visualization API to query arcgis_landing_points - Add /api/v1/datasources/configs/all endpoint - Update Earth to fetch from API instead of static files - Fix scheduler collector ID mappings
120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
"""Epoch AI GPU Clusters Collector
|
|
|
|
Collects data from Epoch AI GPU clusters tracking.
|
|
https://epoch.ai/data/gpu-clusters
|
|
"""
|
|
|
|
import re
|
|
from typing import Dict, Any, List
|
|
from datetime import datetime
|
|
from bs4 import BeautifulSoup
|
|
import httpx
|
|
|
|
from app.services.collectors.base import BaseCollector
|
|
|
|
|
|
|
|
class EpochAIGPUCollector(BaseCollector):
|
|
name = "epoch_ai_gpu"
|
|
priority = "P0"
|
|
module = "L1"
|
|
frequency_hours = 6
|
|
data_type = "gpu_cluster"
|
|
|
|
async def fetch(self) -> List[Dict[str, Any]]:
|
|
"""Fetch Epoch AI GPU clusters data from webpage"""
|
|
url = "https://epoch.ai/data/gpu-clusters"
|
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
return self.parse_response(response.text)
|
|
|
|
def parse_response(self, html: str) -> List[Dict[str, Any]]:
|
|
"""Parse Epoch AI webpage to extract GPU cluster data"""
|
|
data = []
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Try to find data table on the page
|
|
tables = soup.find_all("table")
|
|
for table in tables:
|
|
rows = table.find_all("tr")
|
|
for row in rows[1:]: # Skip header
|
|
cells = row.find_all(["td", "th"])
|
|
if len(cells) >= 5:
|
|
try:
|
|
cluster_name = cells[0].get_text(strip=True)
|
|
if not cluster_name or cluster_name in ["Cluster", "System", "Name"]:
|
|
continue
|
|
|
|
location_cell = cells[1].get_text(strip=True) if len(cells) > 1 else ""
|
|
country, city = self._parse_location(location_cell)
|
|
|
|
perf_cell = cells[2].get_text(strip=True) if len(cells) > 2 else ""
|
|
|
|
entry = {
|
|
"source_id": f"epoch_{re.sub(r'[^a-zA-Z0-9]', '_', cluster_name.lower())}",
|
|
"name": cluster_name,
|
|
"country": country,
|
|
"city": city,
|
|
"latitude": "",
|
|
"longitude": "",
|
|
"value": self._parse_performance(perf_cell),
|
|
"unit": "TFlop/s",
|
|
"metadata": {
|
|
"raw_data": perf_cell,
|
|
},
|
|
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
|
|
}
|
|
data.append(entry)
|
|
except (ValueError, IndexError, AttributeError):
|
|
continue
|
|
|
|
# If no table found, return sample data
|
|
if not data:
|
|
data = self._get_sample_data()
|
|
|
|
return data
|
|
|
|
def _parse_location(self, location: str) -> tuple:
|
|
"""Parse location string into country and city"""
|
|
if not location:
|
|
return "", ""
|
|
if "," in location:
|
|
parts = location.rsplit(",", 1)
|
|
city = parts[0].strip()
|
|
country = parts[1].strip() if len(parts) > 1 else ""
|
|
return country, city
|
|
return location, ""
|
|
|
|
def _parse_performance(self, perf: str) -> str:
|
|
"""Parse performance string to extract value"""
|
|
if not perf:
|
|
return "0"
|
|
match = re.search(r"([\d,.]+)\s*(TFlop/s|PFlop/s|GFlop/s)?", perf, re.I)
|
|
if match:
|
|
return match.group(1).replace(",", "")
|
|
match = re.search(r"([\d,.]+)", perf)
|
|
if match:
|
|
return match.group(1).replace(",", "")
|
|
return "0"
|
|
|
|
def _get_sample_data(self) -> List[Dict[str, Any]]:
|
|
"""Return sample data for testing when scraping fails"""
|
|
return [
|
|
{
|
|
"source_id": "epoch_sample_1",
|
|
"name": "Sample GPU Cluster",
|
|
"country": "United States",
|
|
"city": "San Francisco, CA",
|
|
"latitude": "",
|
|
"longitude": "",
|
|
"value": "1000",
|
|
"unit": "TFlop/s",
|
|
"metadata": {
|
|
"note": "Sample data - Epoch AI page structure may vary",
|
|
},
|
|
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
|
|
},
|
|
]
|