# 数据采集系统 (Collectors) ## 一、系统架构 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 数据采集系统架构 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ TOP500 │ │ Epoch AI │ │ HuggingFace │ │ │ │ 采集器 │ │ 采集器 │ │ 采集器 │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ └───────────────────┼───────────────────┘ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ BaseCollector │◄── 基类 (统一处理) │ │ │ run() 方法 │ │ │ └─────────┬───────────┘ │ │ │ │ │ ┌─────────────────┼─────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ fetch() │ │transform()│ │ _save_data│ │ │ │ 获取原始数据 │ │ 数据转换 │ │ 保存到DB │ │ │ └───────────┘ └───────────┘ └───────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ CollectedData 表 │◄── 统一存储 │ │ └─────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Scheduler (APScheduler) │ │ │ │ 定时任务调度: 每4小时/6小时/12小时/1天 自动执行 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ## 二、工作流程 (Pipeline) ```python # 1. Scheduler 触发 (定时 或 手动触发) # ↓ # 2. run() 方法执行完整流水线 async def run(self, db): # 2.1 检查采集器是否启用 if not collector_registry.is_active(self.name): return {"status": "skipped"} # 2.2 记录任务开始 task = CollectionTask(status="running") db.add(task) await db.commit() # 2.3 FETCH - 获取原始数据 (由子类实现) raw_data = await self.fetch() # 2.4 TRANSFORM - 转换为统一格式 data = self.transform(raw_data) # 2.5 SAVE - 保存到数据库 records_count = await self._save_data(db, data) # 2.6 记录任务完成 task.status = "success" task.records_processed = records_count await db.commit() ``` **核心文件**: `backend/app/services/collectors/base.py` ## 三、采集器列表 | 采集器 | 数据类型 | 数据内容 | 采集频率 | |--------|----------|----------|----------| | TOP500 | supercomputer | 全球超级计算机排名 (算力、性能) | 4小时 | | Epoch AI | gpu_cluster | GPU算力集群信息 | 6小时 | | HuggingFace Models | model | AI模型信息 | 12小时 | | HuggingFace Datasets | dataset | 数据集信息 | 12小时 | | HuggingFace Spaces | space | Demo应用 | 1天 | | PeeringDB | ixp/network/facility | 互联网交换点/网络/机房 | 1-2天 | | TeleGeography | submarine_cable | 海底光缆信息 | 7天 | ## 四、数据格式 (统一存储到 CollectedData 表) ```python # 每个采集器 parse_response() 返回格式 { "source_id": "top500_1", # 原始系统ID (必填) "name": "El Capitan", # 名称 (必填) "description": "系统描述...", # 描述 "country": "United States", # 国家 "city": "Livermore, CA", # 城市 "latitude": "37.6819", # 纬度 (字符串) "longitude": "-121.7681", # 经度 (字符串) "value": "1742.00", # 性能值 (如算力) "unit": "PFlop/s", # 单位 "metadata": { # 额外数据 (JSON) "rank": 1, "r_peak": 2746.38, "cores": 11039616 }, "reference_date": "2025-11-01" # 数据参考日期 } ``` ## 五、数据库表结构 **CollectedData 表** (`collected_data`) | 字段 | 类型 | 说明 | |------|------|------| | id | SERIAL | 主键 | | source | VARCHAR(100) | 数据源名称 (top500, huggingface等) | | source_id | VARCHAR(100) | 原始数据ID | | data_type | VARCHAR(50) | 数据类型 (supercomputer, model等) | | name | VARCHAR(500) | 名称 | | title | VARCHAR(500) | 标题 | | description | TEXT | 描述 | | country | VARCHAR(100) | 国家 | | city | VARCHAR(100) | 城市 | | latitude | VARCHAR(50) | 纬度 | | longitude | VARCHAR(50) | 经度 | | value | VARCHAR(100) | 性能值 | | unit | VARCHAR(20) | 单位 | | metadata | JSONB | 额外元数据 | | collected_at | TIMESTAMP | 采集时间 | | reference_date | TIMESTAMP | 数据参考日期 | | is_valid | INTEGER | 是否有效 | **核心文件**: `backend/app/models/collected_data.py` ## 六、TOP500 采集器示例 (完整流程) ```python # 1. fetch() - 从网页获取HTML async def fetch(self): url = "https://top500.org/lists/top500/list/2025/11/" response = await client.get(url) return response.text # 返回HTML # 2. parse_response() - 解析HTML为统一格式 def parse_response(self, html): soup = BeautifulSoup(html, "html.parser") table = soup.find("table") for row in table.find_all("tr")[1:]: # 跳过表头 cells = row.find_all("td") entry = { "source_id": f"top500_{cells[0].text}", # "top500_1" "name": cells[1].text.strip(), # "El Capitan" "country": cells[2].text.strip(), # "United States" "city": "", # 城市 "latitude": "", # 需进一步解析 "longitude": "", "value": "1742.00", # Rmax "unit": "PFlop/s", "metadata": { "rank": 1, "cores": "11340000" }, "reference_date": "2025-11-01" } data.append(entry) return data # 3. run() 自动调用 _save_data() 保存到数据库 ``` **核心文件**: `backend/app/services/collectors/top500.py` ## 七、调度机制 ```python # 启动时注册所有采集器到定时任务 def start_scheduler(): for name, collector in collectors.items(): if collector_registry.is_active(name): scheduler.add_job( run_collector_task, trigger=IntervalTrigger(hours=collector.frequency_hours), id=name, name=name ) ``` | 采集器 | 采集频率 | |--------|----------| | TOP500 | 每4小时 | | Epoch AI | 每6小时 | | HuggingFace | 每12小时 | | PeeringDB | 每1-2天 | | TeleGeography | 每7天 | **核心文件**: `backend/app/services/scheduler.py` ## 八、相关代码文件 ``` backend/app/services/collectors/ ├── base.py # 基类: run() 流水线, _save_data() 保存 ├── registry.py # 采集器注册表 ├── scheduler.py # 定时任务调度 (APScheduler) ├── top500.py # TOP500采集器 ├── epoch_ai.py # Epoch AI采集器 ├── huggingface.py # HuggingFace采集器 ├── peeringdb.py # PeeringDB采集器 └── telegeraphy.py # TeleGeography海底光缆采集器 backend/app/models/ └── collected_data.py # 统一数据模型 ``` ## 九、数据使用场景 采集的数据最终会: 1. **可视化展示** - 在UE5大屏上显示超级计算机、GPU集群、海底光缆的地理位置 2. **态势分析** - 统计全球算力分布、增长趋势 3. **告警系统** - 检测重要节点变化 ## 十、采集器注册机制 采集器在应用启动时自动注册: ```python # backend/app/services/collectors/__init__.py collector_registry.register(TOP500Collector()) collector_registry.register(EpochAIGPUCollector()) collector_registry.register(HuggingFaceModelCollector()) collector_registry.register(HuggingFaceDatasetCollector()) collector_registry.register(HuggingFaceSpacesCollector()) collector_registry.register(PeeringDBIXPCollector()) collector_registry.register(PeeringDBNetworkCollector()) collector_registry.register(PeeringDBFacilityCollector()) collector_registry.register(TeleGeographyCableCollector()) collector_registry.register(TeleGeographyLandingPointCollector()) collector_registry.register(TeleGeographyCableSystemCollector()) ``` **核心文件**: `backend/app/services/collectors/registry.py` ## 十一、触发采集 ### 方式一:定时触发 系统启动时,APScheduler会自动根据各采集器的`frequency_hours`设置定时任务。 ### 方式二:手动触发 API ```bash # 触发TOP500采集 curl -X POST http://localhost:8000/api/v1/datasources/1/trigger \ -H "Authorization: Bearer " ``` **核心文件**: `backend/app/api/v1/datasources.py`