264 lines
11 KiB
Markdown
264 lines
11 KiB
Markdown
# 数据采集系统 (Collectors)
|
||
|
||
## 一、系统架构
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 数据采集系统架构 │
|
||
├─────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
|
||
│ │ TOP500 │ │ Epoch AI │ │ HuggingFace │ │
|
||
│ │ 采集器 │ │ 采集器 │ │ 采集器 │ │
|
||
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
|
||
│ │ │ │ │
|
||
│ └───────────────────┼───────────────────┘ │
|
||
│ ▼ │
|
||
│ ┌─────────────────────┐ │
|
||
│ │ BaseCollector │◄── 基类 (统一处理) │
|
||
│ │ run() 方法 │ │
|
||
│ └─────────┬───────────┘ │
|
||
│ │ │
|
||
│ ┌─────────────────┼─────────────────┐ │
|
||
│ ▼ ▼ ▼ │
|
||
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
|
||
│ │ fetch() │ │transform()│ │ _save_data│ │
|
||
│ │ 获取原始数据 │ │ 数据转换 │ │ 保存到DB │ │
|
||
│ └───────────┘ └───────────┘ └───────────┘ │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ ┌─────────────────────┐ │
|
||
│ │ CollectedData 表 │◄── 统一存储 │
|
||
│ └─────────────────────┘ │
|
||
│ │
|
||
│ ┌─────────────────────────────────────────────────────────┐ │
|
||
│ │ Scheduler (APScheduler) │ │
|
||
│ │ 定时任务调度: 每4小时/6小时/12小时/1天 自动执行 │ │
|
||
│ └─────────────────────────────────────────────────────────┘ │
|
||
│ │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
## 二、工作流程 (Pipeline)
|
||
|
||
```python
|
||
# 1. Scheduler 触发 (定时 或 手动触发)
|
||
# ↓
|
||
|
||
# 2. run() 方法执行完整流水线
|
||
async def run(self, db):
|
||
# 2.1 检查采集器是否启用
|
||
if not collector_registry.is_active(self.name):
|
||
return {"status": "skipped"}
|
||
|
||
# 2.2 记录任务开始
|
||
task = CollectionTask(status="running")
|
||
db.add(task)
|
||
await db.commit()
|
||
|
||
# 2.3 FETCH - 获取原始数据 (由子类实现)
|
||
raw_data = await self.fetch()
|
||
|
||
# 2.4 TRANSFORM - 转换为统一格式
|
||
data = self.transform(raw_data)
|
||
|
||
# 2.5 SAVE - 保存到数据库
|
||
records_count = await self._save_data(db, data)
|
||
|
||
# 2.6 记录任务完成
|
||
task.status = "success"
|
||
task.records_processed = records_count
|
||
await db.commit()
|
||
```
|
||
|
||
**核心文件**: `backend/app/services/collectors/base.py`
|
||
|
||
## 三、采集器列表
|
||
|
||
| 采集器 | 数据类型 | 数据内容 | 采集频率 |
|
||
|--------|----------|----------|----------|
|
||
| TOP500 | supercomputer | 全球超级计算机排名 (算力、性能) | 4小时 |
|
||
| Epoch AI | gpu_cluster | GPU算力集群信息 | 6小时 |
|
||
| HuggingFace Models | model | AI模型信息 | 12小时 |
|
||
| HuggingFace Datasets | dataset | 数据集信息 | 12小时 |
|
||
| HuggingFace Spaces | space | Demo应用 | 1天 |
|
||
| PeeringDB | ixp/network/facility | 互联网交换点/网络/机房 | 1-2天 |
|
||
| TeleGeography | submarine_cable | 海底光缆信息 | 7天 |
|
||
|
||
## 四、数据格式 (统一存储到 CollectedData 表)
|
||
|
||
```python
|
||
# 每个采集器 parse_response() 返回格式
|
||
{
|
||
"source_id": "top500_1", # 原始系统ID (必填)
|
||
"name": "El Capitan", # 名称 (必填)
|
||
"description": "系统描述...", # 描述
|
||
"country": "United States", # 国家
|
||
"city": "Livermore, CA", # 城市
|
||
"latitude": "37.6819", # 纬度 (字符串)
|
||
"longitude": "-121.7681", # 经度 (字符串)
|
||
"value": "1742.00", # 性能值 (如算力)
|
||
"unit": "PFlop/s", # 单位
|
||
"metadata": { # 额外数据 (JSON)
|
||
"rank": 1,
|
||
"r_peak": 2746.38,
|
||
"cores": 11039616
|
||
},
|
||
"reference_date": "2025-11-01" # 数据参考日期
|
||
}
|
||
```
|
||
|
||
## 五、数据库表结构
|
||
|
||
**CollectedData 表** (`collected_data`)
|
||
|
||
| 字段 | 类型 | 说明 |
|
||
|------|------|------|
|
||
| id | SERIAL | 主键 |
|
||
| source | VARCHAR(100) | 数据源名称 (top500, huggingface等) |
|
||
| source_id | VARCHAR(100) | 原始数据ID |
|
||
| data_type | VARCHAR(50) | 数据类型 (supercomputer, model等) |
|
||
| name | VARCHAR(500) | 名称 |
|
||
| title | VARCHAR(500) | 标题 |
|
||
| description | TEXT | 描述 |
|
||
| country | VARCHAR(100) | 国家 |
|
||
| city | VARCHAR(100) | 城市 |
|
||
| latitude | VARCHAR(50) | 纬度 |
|
||
| longitude | VARCHAR(50) | 经度 |
|
||
| value | VARCHAR(100) | 性能值 |
|
||
| unit | VARCHAR(20) | 单位 |
|
||
| metadata | JSONB | 额外元数据 |
|
||
| collected_at | TIMESTAMP | 采集时间 |
|
||
| reference_date | TIMESTAMP | 数据参考日期 |
|
||
| is_valid | INTEGER | 是否有效 |
|
||
|
||
**核心文件**: `backend/app/models/collected_data.py`
|
||
|
||
## 六、TOP500 采集器示例 (完整流程)
|
||
|
||
```python
|
||
# 1. fetch() - 从网页获取HTML
|
||
async def fetch(self):
|
||
url = "https://top500.org/lists/top500/list/2025/11/"
|
||
response = await client.get(url)
|
||
return response.text # 返回HTML
|
||
|
||
# 2. parse_response() - 解析HTML为统一格式
|
||
def parse_response(self, html):
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
table = soup.find("table")
|
||
|
||
for row in table.find_all("tr")[1:]: # 跳过表头
|
||
cells = row.find_all("td")
|
||
|
||
entry = {
|
||
"source_id": f"top500_{cells[0].text}", # "top500_1"
|
||
"name": cells[1].text.strip(), # "El Capitan"
|
||
"country": cells[2].text.strip(), # "United States"
|
||
"city": "", # 城市
|
||
"latitude": "", # 需进一步解析
|
||
"longitude": "",
|
||
"value": "1742.00", # Rmax
|
||
"unit": "PFlop/s",
|
||
"metadata": {
|
||
"rank": 1,
|
||
"cores": "11340000"
|
||
},
|
||
"reference_date": "2025-11-01"
|
||
}
|
||
data.append(entry)
|
||
|
||
return data
|
||
|
||
# 3. run() 自动调用 _save_data() 保存到数据库
|
||
```
|
||
|
||
**核心文件**: `backend/app/services/collectors/top500.py`
|
||
|
||
## 七、调度机制
|
||
|
||
```python
|
||
# 启动时注册所有采集器到定时任务
|
||
def start_scheduler():
|
||
for name, collector in collectors.items():
|
||
if collector_registry.is_active(name):
|
||
scheduler.add_job(
|
||
run_collector_task,
|
||
trigger=IntervalTrigger(hours=collector.frequency_hours),
|
||
id=name,
|
||
name=name
|
||
)
|
||
```
|
||
|
||
| 采集器 | 采集频率 |
|
||
|--------|----------|
|
||
| TOP500 | 每4小时 |
|
||
| Epoch AI | 每6小时 |
|
||
| HuggingFace | 每12小时 |
|
||
| PeeringDB | 每1-2天 |
|
||
| TeleGeography | 每7天 |
|
||
|
||
**核心文件**: `backend/app/services/scheduler.py`
|
||
|
||
## 八、相关代码文件
|
||
|
||
```
|
||
backend/app/services/collectors/
|
||
├── base.py # 基类: run() 流水线, _save_data() 保存
|
||
├── registry.py # 采集器注册表
|
||
├── scheduler.py # 定时任务调度 (APScheduler)
|
||
├── top500.py # TOP500采集器
|
||
├── epoch_ai.py # Epoch AI采集器
|
||
├── huggingface.py # HuggingFace采集器
|
||
├── peeringdb.py # PeeringDB采集器
|
||
└── telegeraphy.py # TeleGeography海底光缆采集器
|
||
|
||
backend/app/models/
|
||
└── collected_data.py # 统一数据模型
|
||
```
|
||
|
||
## 九、数据使用场景
|
||
|
||
采集的数据最终会:
|
||
|
||
1. **可视化展示** - 在UE5大屏上显示超级计算机、GPU集群、海底光缆的地理位置
|
||
2. **态势分析** - 统计全球算力分布、增长趋势
|
||
3. **告警系统** - 检测重要节点变化
|
||
|
||
## 十、采集器注册机制
|
||
|
||
采集器在应用启动时自动注册:
|
||
|
||
```python
|
||
# backend/app/services/collectors/__init__.py
|
||
|
||
collector_registry.register(TOP500Collector())
|
||
collector_registry.register(EpochAIGPUCollector())
|
||
collector_registry.register(HuggingFaceModelCollector())
|
||
collector_registry.register(HuggingFaceDatasetCollector())
|
||
collector_registry.register(HuggingFaceSpacesCollector())
|
||
collector_registry.register(PeeringDBIXPCollector())
|
||
collector_registry.register(PeeringDBNetworkCollector())
|
||
collector_registry.register(PeeringDBFacilityCollector())
|
||
collector_registry.register(TeleGeographyCableCollector())
|
||
collector_registry.register(TeleGeographyLandingPointCollector())
|
||
collector_registry.register(TeleGeographyCableSystemCollector())
|
||
```
|
||
|
||
**核心文件**: `backend/app/services/collectors/registry.py`
|
||
|
||
## 十一、触发采集
|
||
|
||
### 方式一:定时触发
|
||
系统启动时,APScheduler会自动根据各采集器的`frequency_hours`设置定时任务。
|
||
|
||
### 方式二:手动触发 API
|
||
|
||
```bash
|
||
# 触发TOP500采集
|
||
curl -X POST http://localhost:8000/api/v1/datasources/1/trigger \
|
||
-H "Authorization: Bearer <token>"
|
||
```
|
||
|
||
**核心文件**: `backend/app/api/v1/datasources.py`
|