Files
planet/docs/collectors.md
2026-03-05 11:46:58 +08:00

264 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 数据采集系统 (Collectors)
## 一、系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ 数据采集系统架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TOP500 │ │ Epoch AI │ │ HuggingFace │ │
│ │ 采集器 │ │ 采集器 │ │ 采集器 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ BaseCollector │◄── 基类 (统一处理) │
│ │ run() 方法 │ │
│ └─────────┬───────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ fetch() │ │transform()│ │ _save_data│ │
│ │ 获取原始数据 │ │ 数据转换 │ │ 保存到DB │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ CollectedData 表 │◄── 统一存储 │
│ └─────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Scheduler (APScheduler) │ │
│ │ 定时任务调度: 每4小时/6小时/12小时/1天 自动执行 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
```
## 二、工作流程 (Pipeline)
```python
# 1. Scheduler 触发 (定时 或 手动触发)
# ↓
# 2. run() 方法执行完整流水线
async def run(self, db):
# 2.1 检查采集器是否启用
if not collector_registry.is_active(self.name):
return {"status": "skipped"}
# 2.2 记录任务开始
task = CollectionTask(status="running")
db.add(task)
await db.commit()
# 2.3 FETCH - 获取原始数据 (由子类实现)
raw_data = await self.fetch()
# 2.4 TRANSFORM - 转换为统一格式
data = self.transform(raw_data)
# 2.5 SAVE - 保存到数据库
records_count = await self._save_data(db, data)
# 2.6 记录任务完成
task.status = "success"
task.records_processed = records_count
await db.commit()
```
**核心文件**: `backend/app/services/collectors/base.py`
## 三、采集器列表
| 采集器 | 数据类型 | 数据内容 | 采集频率 |
|--------|----------|----------|----------|
| TOP500 | supercomputer | 全球超级计算机排名 (算力、性能) | 4小时 |
| Epoch AI | gpu_cluster | GPU算力集群信息 | 6小时 |
| HuggingFace Models | model | AI模型信息 | 12小时 |
| HuggingFace Datasets | dataset | 数据集信息 | 12小时 |
| HuggingFace Spaces | space | Demo应用 | 1天 |
| PeeringDB | ixp/network/facility | 互联网交换点/网络/机房 | 1-2天 |
| TeleGeography | submarine_cable | 海底光缆信息 | 7天 |
## 四、数据格式 (统一存储到 CollectedData 表)
```python
# 每个采集器 parse_response() 返回格式
{
"source_id": "top500_1", # 原始系统ID (必填)
"name": "El Capitan", # 名称 (必填)
"description": "系统描述...", # 描述
"country": "United States", # 国家
"city": "Livermore, CA", # 城市
"latitude": "37.6819", # 纬度 (字符串)
"longitude": "-121.7681", # 经度 (字符串)
"value": "1742.00", # 性能值 (如算力)
"unit": "PFlop/s", # 单位
"metadata": { # 额外数据 (JSON)
"rank": 1,
"r_peak": 2746.38,
"cores": 11039616
},
"reference_date": "2025-11-01" # 数据参考日期
}
```
## 五、数据库表结构
**CollectedData 表** (`collected_data`)
| 字段 | 类型 | 说明 |
|------|------|------|
| id | SERIAL | 主键 |
| source | VARCHAR(100) | 数据源名称 (top500, huggingface等) |
| source_id | VARCHAR(100) | 原始数据ID |
| data_type | VARCHAR(50) | 数据类型 (supercomputer, model等) |
| name | VARCHAR(500) | 名称 |
| title | VARCHAR(500) | 标题 |
| description | TEXT | 描述 |
| country | VARCHAR(100) | 国家 |
| city | VARCHAR(100) | 城市 |
| latitude | VARCHAR(50) | 纬度 |
| longitude | VARCHAR(50) | 经度 |
| value | VARCHAR(100) | 性能值 |
| unit | VARCHAR(20) | 单位 |
| metadata | JSONB | 额外元数据 |
| collected_at | TIMESTAMP | 采集时间 |
| reference_date | TIMESTAMP | 数据参考日期 |
| is_valid | INTEGER | 是否有效 |
**核心文件**: `backend/app/models/collected_data.py`
## 六、TOP500 采集器示例 (完整流程)
```python
# 1. fetch() - 从网页获取HTML
async def fetch(self):
url = "https://top500.org/lists/top500/list/2025/11/"
response = await client.get(url)
return response.text # 返回HTML
# 2. parse_response() - 解析HTML为统一格式
def parse_response(self, html):
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table")
for row in table.find_all("tr")[1:]: # 跳过表头
cells = row.find_all("td")
entry = {
"source_id": f"top500_{cells[0].text}", # "top500_1"
"name": cells[1].text.strip(), # "El Capitan"
"country": cells[2].text.strip(), # "United States"
"city": "", # 城市
"latitude": "", # 需进一步解析
"longitude": "",
"value": "1742.00", # Rmax
"unit": "PFlop/s",
"metadata": {
"rank": 1,
"cores": "11340000"
},
"reference_date": "2025-11-01"
}
data.append(entry)
return data
# 3. run() 自动调用 _save_data() 保存到数据库
```
**核心文件**: `backend/app/services/collectors/top500.py`
## 七、调度机制
```python
# 启动时注册所有采集器到定时任务
def start_scheduler():
for name, collector in collectors.items():
if collector_registry.is_active(name):
scheduler.add_job(
run_collector_task,
trigger=IntervalTrigger(hours=collector.frequency_hours),
id=name,
name=name
)
```
| 采集器 | 采集频率 |
|--------|----------|
| TOP500 | 每4小时 |
| Epoch AI | 每6小时 |
| HuggingFace | 每12小时 |
| PeeringDB | 每1-2天 |
| TeleGeography | 每7天 |
**核心文件**: `backend/app/services/scheduler.py`
## 八、相关代码文件
```
backend/app/services/collectors/
├── base.py # 基类: run() 流水线, _save_data() 保存
├── registry.py # 采集器注册表
├── scheduler.py # 定时任务调度 (APScheduler)
├── top500.py # TOP500采集器
├── epoch_ai.py # Epoch AI采集器
├── huggingface.py # HuggingFace采集器
├── peeringdb.py # PeeringDB采集器
└── telegeraphy.py # TeleGeography海底光缆采集器
backend/app/models/
└── collected_data.py # 统一数据模型
```
## 九、数据使用场景
采集的数据最终会:
1. **可视化展示** - 在UE5大屏上显示超级计算机、GPU集群、海底光缆的地理位置
2. **态势分析** - 统计全球算力分布、增长趋势
3. **告警系统** - 检测重要节点变化
## 十、采集器注册机制
采集器在应用启动时自动注册:
```python
# backend/app/services/collectors/__init__.py
collector_registry.register(TOP500Collector())
collector_registry.register(EpochAIGPUCollector())
collector_registry.register(HuggingFaceModelCollector())
collector_registry.register(HuggingFaceDatasetCollector())
collector_registry.register(HuggingFaceSpacesCollector())
collector_registry.register(PeeringDBIXPCollector())
collector_registry.register(PeeringDBNetworkCollector())
collector_registry.register(PeeringDBFacilityCollector())
collector_registry.register(TeleGeographyCableCollector())
collector_registry.register(TeleGeographyLandingPointCollector())
collector_registry.register(TeleGeographyCableSystemCollector())
```
**核心文件**: `backend/app/services/collectors/registry.py`
## 十一、触发采集
### 方式一:定时触发
系统启动时APScheduler会自动根据各采集器的`frequency_hours`设置定时任务。
### 方式二:手动触发 API
```bash
# 触发TOP500采集
curl -X POST http://localhost:8000/api/v1/datasources/1/trigger \
-H "Authorization: Bearer <token>"
```
**核心文件**: `backend/app/api/v1/datasources.py`