Files
planet/docs/collectors.md
2026-03-05 11:46:58 +08:00

11 KiB
Raw Permalink Blame History

数据采集系统 (Collectors)

一、系统架构

┌─────────────────────────────────────────────────────────────────┐
│                        数据采集系统架构                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐       │
│   │  TOP500     │    │  Epoch AI   │    │ HuggingFace │       │
│   │  采集器      │    │  采集器      │    │  采集器      │       │
│   └──────┬──────┘    └──────┬──────┘    └──────┬──────┘       │
│          │                   │                   │              │
│          └───────────────────┼───────────────────┘              │
│                              ▼                                  │
│                  ┌─────────────────────┐                        │
│                  │   BaseCollector    │◄── 基类 (统一处理)      │
│                  │   run() 方法       │                        │
│                  └─────────┬───────────┘                        │
│                            │                                     │
│          ┌─────────────────┼─────────────────┐                   │
│          ▼                 ▼                 ▼                   │
│   ┌───────────┐    ┌───────────┐    ┌───────────┐              │
│   │ fetch()   │    │transform()│    │ _save_data│              │
│   │ 获取原始数据 │    │ 数据转换   │    │ 保存到DB  │              │
│   └───────────┘    └───────────┘    └───────────┘              │
│                              │                                  │
│                              ▼                                  │
│                  ┌─────────────────────┐                        │
│                  │  CollectedData 表   │◄── 统一存储            │
│                  └─────────────────────┘                        │
│                                                                 │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │                    Scheduler (APScheduler)               │  │
│   │   定时任务调度: 每4小时/6小时/12小时/1天 自动执行        │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

二、工作流程 (Pipeline)

# 1. Scheduler 触发 (定时 或 手动触发)
#    ↓

# 2. run() 方法执行完整流水线
async def run(self, db):
    # 2.1 检查采集器是否启用
    if not collector_registry.is_active(self.name):
        return {"status": "skipped"}
    
    # 2.2 记录任务开始
    task = CollectionTask(status="running")
    db.add(task)
    await db.commit()
    
    # 2.3 FETCH - 获取原始数据 (由子类实现)
    raw_data = await self.fetch()
    
    # 2.4 TRANSFORM - 转换为统一格式
    data = self.transform(raw_data)
    
    # 2.5 SAVE - 保存到数据库
    records_count = await self._save_data(db, data)
    
    # 2.6 记录任务完成
    task.status = "success"
    task.records_processed = records_count
    await db.commit()

核心文件: backend/app/services/collectors/base.py

三、采集器列表

采集器 数据类型 数据内容 采集频率
TOP500 supercomputer 全球超级计算机排名 (算力、性能) 4小时
Epoch AI gpu_cluster GPU算力集群信息 6小时
HuggingFace Models model AI模型信息 12小时
HuggingFace Datasets dataset 数据集信息 12小时
HuggingFace Spaces space Demo应用 1天
PeeringDB ixp/network/facility 互联网交换点/网络/机房 1-2天
TeleGeography submarine_cable 海底光缆信息 7天

四、数据格式 (统一存储到 CollectedData 表)

# 每个采集器 parse_response() 返回格式
{
    "source_id": "top500_1",          # 原始系统ID (必填)
    "name": "El Capitan",             # 名称 (必填)
    "description": "系统描述...",       # 描述
    "country": "United States",        # 国家
    "city": "Livermore, CA",           # 城市
    "latitude": "37.6819",            # 纬度 (字符串)
    "longitude": "-121.7681",         # 经度 (字符串)
    "value": "1742.00",               # 性能值 (如算力)
    "unit": "PFlop/s",                # 单位
    "metadata": {                      # 额外数据 (JSON)
        "rank": 1,
        "r_peak": 2746.38,
        "cores": 11039616
    },
    "reference_date": "2025-11-01"    # 数据参考日期
}

五、数据库表结构

CollectedData 表 (collected_data)

字段 类型 说明
id SERIAL 主键
source VARCHAR(100) 数据源名称 (top500, huggingface等)
source_id VARCHAR(100) 原始数据ID
data_type VARCHAR(50) 数据类型 (supercomputer, model等)
name VARCHAR(500) 名称
title VARCHAR(500) 标题
description TEXT 描述
country VARCHAR(100) 国家
city VARCHAR(100) 城市
latitude VARCHAR(50) 纬度
longitude VARCHAR(50) 经度
value VARCHAR(100) 性能值
unit VARCHAR(20) 单位
metadata JSONB 额外元数据
collected_at TIMESTAMP 采集时间
reference_date TIMESTAMP 数据参考日期
is_valid INTEGER 是否有效

核心文件: backend/app/models/collected_data.py

六、TOP500 采集器示例 (完整流程)

# 1. fetch() - 从网页获取HTML
async def fetch(self):
    url = "https://top500.org/lists/top500/list/2025/11/"
    response = await client.get(url)
    return response.text  # 返回HTML

# 2. parse_response() - 解析HTML为统一格式
def parse_response(self, html):
    soup = BeautifulSoup(html, "html.parser")
    table = soup.find("table")
    
    for row in table.find_all("tr")[1:]:  # 跳过表头
        cells = row.find_all("td")
        
        entry = {
            "source_id": f"top500_{cells[0].text}",      # "top500_1"
            "name": cells[1].text.strip(),               # "El Capitan"
            "country": cells[2].text.strip(),            # "United States"
            "city": "",                                   # 城市
            "latitude": "",                               # 需进一步解析
            "longitude": "",
            "value": "1742.00",                          # Rmax
            "unit": "PFlop/s",
            "metadata": {
                "rank": 1,
                "cores": "11340000"
            },
            "reference_date": "2025-11-01"
        }
        data.append(entry)
    
    return data

# 3. run() 自动调用 _save_data() 保存到数据库

核心文件: backend/app/services/collectors/top500.py

七、调度机制

# 启动时注册所有采集器到定时任务
def start_scheduler():
    for name, collector in collectors.items():
        if collector_registry.is_active(name):
            scheduler.add_job(
                run_collector_task,
                trigger=IntervalTrigger(hours=collector.frequency_hours),
                id=name,
                name=name
            )
采集器 采集频率
TOP500 每4小时
Epoch AI 每6小时
HuggingFace 每12小时
PeeringDB 每1-2天
TeleGeography 每7天

核心文件: backend/app/services/scheduler.py

八、相关代码文件

backend/app/services/collectors/
├── base.py              # 基类: run() 流水线, _save_data() 保存
├── registry.py          # 采集器注册表
├── scheduler.py         # 定时任务调度 (APScheduler)
├── top500.py            # TOP500采集器
├── epoch_ai.py          # Epoch AI采集器
├── huggingface.py       # HuggingFace采集器
├── peeringdb.py         # PeeringDB采集器
└── telegeraphy.py      # TeleGeography海底光缆采集器

backend/app/models/
└── collected_data.py    # 统一数据模型

九、数据使用场景

采集的数据最终会:

  1. 可视化展示 - 在UE5大屏上显示超级计算机、GPU集群、海底光缆的地理位置
  2. 态势分析 - 统计全球算力分布、增长趋势
  3. 告警系统 - 检测重要节点变化

十、采集器注册机制

采集器在应用启动时自动注册:

# backend/app/services/collectors/__init__.py

collector_registry.register(TOP500Collector())
collector_registry.register(EpochAIGPUCollector())
collector_registry.register(HuggingFaceModelCollector())
collector_registry.register(HuggingFaceDatasetCollector())
collector_registry.register(HuggingFaceSpacesCollector())
collector_registry.register(PeeringDBIXPCollector())
collector_registry.register(PeeringDBNetworkCollector())
collector_registry.register(PeeringDBFacilityCollector())
collector_registry.register(TeleGeographyCableCollector())
collector_registry.register(TeleGeographyLandingPointCollector())
collector_registry.register(TeleGeographyCableSystemCollector())

核心文件: backend/app/services/collectors/registry.py

十一、触发采集

方式一:定时触发

系统启动时APScheduler会自动根据各采集器的frequency_hours设置定时任务。

方式二:手动触发 API

# 触发TOP500采集
curl -X POST http://localhost:8000/api/v1/datasources/1/trigger \
  -H "Authorization: Bearer <token>"

核心文件: backend/app/api/v1/datasources.py