From 020c1d5051e86e6ce7dd820e493639f140c9df5a Mon Sep 17 00:00:00 2001 From: linkong Date: Wed, 25 Mar 2026 17:19:10 +0800 Subject: [PATCH] Refine data management and collection workflows --- .gitignore | 2 + .python-version | 1 + backend/app/api/v1/collected_data.py | 290 +++++---- backend/app/api/v1/datasources.py | 46 +- backend/app/api/v1/visualization.py | 45 +- backend/app/core/collected_data_fields.py | 62 ++ backend/app/core/countries.py | 280 ++++++++ backend/app/db/session.py | 53 +- backend/app/main.py | 8 +- backend/app/models/__init__.py | 2 + backend/app/models/collected_data.py | 42 +- backend/app/models/data_snapshot.py | 26 + backend/app/models/task.py | 1 + .../services/collectors/arcgis_relation.py | 133 +++- backend/app/services/collectors/base.py | 245 ++++++- backend/app/services/collectors/peeringdb.py | 6 +- backend/app/services/collectors/top500.py | 210 ++++-- backend/app/services/scheduler.py | 47 +- docs/collected-data-column-removal-plan.md | 207 ++++++ docs/collected-data-history-plan.md | 402 ++++++++++++ docs/system-settings-plan.md | 3 +- frontend/src/index.css | 306 ++++++++- frontend/src/main.tsx | 2 +- frontend/src/pages/Dashboard/Dashboard.tsx | 14 +- frontend/src/pages/DataList/DataList.tsx | 479 +++++++++++--- .../src/pages/DataSources/DataSources.tsx | 207 ++++-- frontend/src/pages/Settings/Settings.tsx | 250 +++---- frontend/src/pages/Users/Users.tsx | 40 +- planet.sh | 24 +- pyproject.toml | 24 +- scripts/backfill_collected_data_metadata.py | 57 ++ ...eck_collected_data_column_removal_ready.py | 119 ++++ scripts/drop_collected_data_legacy_columns.py | 41 ++ uv.lock | 614 ++++++++---------- 34 files changed, 3341 insertions(+), 947 deletions(-) create mode 100644 .python-version create mode 100644 backend/app/core/collected_data_fields.py create mode 100644 backend/app/core/countries.py create mode 100644 backend/app/models/data_snapshot.py create mode 100644 docs/collected-data-column-removal-plan.md create mode 100644 docs/collected-data-history-plan.md create mode 100644 scripts/backfill_collected_data_metadata.py create mode 100644 scripts/check_collected_data_column_removal_ready.py create mode 100644 scripts/drop_collected_data_legacy_columns.py diff --git a/.gitignore b/.gitignore index a621032e..bb7024c5 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,8 @@ MANIFEST venv/ ENV/ env/ +.uv/ +.uv-cache/ .ruff_cache/ *.db *.sqlite diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..6324d401 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.14 diff --git a/backend/app/api/v1/collected_data.py b/backend/app/api/v1/collected_data.py index bf0b9e17..0b77edfa 100644 --- a/backend/app/api/v1/collected_data.py +++ b/backend/app/api/v1/collected_data.py @@ -7,6 +7,8 @@ import json import csv import io +from app.core.collected_data_fields import get_metadata_field +from app.core.countries import COUNTRY_OPTIONS, get_country_search_variants, normalize_country from app.db.session import get_db from app.models.user import User from app.core.security import get_current_user @@ -15,8 +17,119 @@ from app.models.collected_data import CollectedData router = APIRouter() +COUNTRY_SQL = "metadata->>'country'" +SEARCHABLE_SQL = [ + "name", + "title", + "description", + "source", + "data_type", + "source_id", + "metadata::text", +] + + +def parse_multi_values(value: Optional[str]) -> list[str]: + if not value: + return [] + return [item.strip() for item in value.split(",") if item.strip()] + + +def build_in_condition(field_sql: str, values: list[str], param_prefix: str, params: dict) -> str: + placeholders = [] + for index, value in enumerate(values): + key = f"{param_prefix}_{index}" + params[key] = value + placeholders.append(f":{key}") + return f"{field_sql} IN ({', '.join(placeholders)})" + + +def build_search_condition(search: Optional[str], params: dict) -> Optional[str]: + if not search: + return None + + normalized = search.strip() + if not normalized: + return None + + search_terms = [normalized] + for variant in get_country_search_variants(normalized): + if variant.casefold() not in {term.casefold() for term in search_terms}: + search_terms.append(variant) + + conditions = [] + for index, term in enumerate(search_terms): + params[f"search_{index}"] = f"%{term}%" + conditions.extend(f"{field} ILIKE :search_{index}" for field in SEARCHABLE_SQL) + + params["search_exact"] = normalized + params["search_prefix"] = f"{normalized}%" + + canonical_variants = get_country_search_variants(normalized) + canonical = canonical_variants[0] if canonical_variants else None + params["country_search_exact"] = canonical or normalized + params["country_search_prefix"] = f"{(canonical or normalized)}%" + + return "(" + " OR ".join(conditions) + ")" + + +def build_search_rank_sql(search: Optional[str]) -> str: + if not search or not search.strip(): + return "0" + + return """ + CASE + WHEN name ILIKE :search_exact THEN 700 + WHEN name ILIKE :search_prefix THEN 600 + WHEN title ILIKE :search_exact THEN 500 + WHEN title ILIKE :search_prefix THEN 400 + WHEN metadata->>'country' ILIKE :country_search_exact THEN 380 + WHEN metadata->>'country' ILIKE :country_search_prefix THEN 340 + WHEN source_id ILIKE :search_exact THEN 350 + WHEN source ILIKE :search_exact THEN 300 + WHEN data_type ILIKE :search_exact THEN 250 + WHEN description ILIKE :search_0 THEN 150 + WHEN metadata::text ILIKE :search_0 THEN 100 + WHEN title ILIKE :search_0 THEN 80 + WHEN name ILIKE :search_0 THEN 60 + WHEN source ILIKE :search_0 THEN 40 + WHEN data_type ILIKE :search_0 THEN 30 + WHEN source_id ILIKE :search_0 THEN 20 + ELSE 0 + END + """ + + +def serialize_collected_row(row) -> dict: + metadata = row[7] + return { + "id": row[0], + "source": row[1], + "source_id": row[2], + "data_type": row[3], + "name": row[4], + "title": row[5], + "description": row[6], + "country": get_metadata_field(metadata, "country"), + "city": get_metadata_field(metadata, "city"), + "latitude": get_metadata_field(metadata, "latitude"), + "longitude": get_metadata_field(metadata, "longitude"), + "value": get_metadata_field(metadata, "value"), + "unit": get_metadata_field(metadata, "unit"), + "metadata": metadata, + "cores": get_metadata_field(metadata, "cores"), + "rmax": get_metadata_field(metadata, "rmax"), + "rpeak": get_metadata_field(metadata, "rpeak"), + "power": get_metadata_field(metadata, "power"), + "collected_at": row[8].isoformat() if row[8] else None, + "reference_date": row[9].isoformat() if row[9] else None, + "is_valid": row[10], + } + + @router.get("") async def list_collected_data( + mode: str = Query("current", description="查询模式: current/history"), source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), @@ -27,25 +140,30 @@ async def list_collected_data( db: AsyncSession = Depends(get_db), ): """查询采集的数据列表""" + normalized_country = normalize_country(country) if country else None + source_values = parse_multi_values(source) + data_type_values = parse_multi_values(data_type) # Build WHERE clause conditions = [] params = {} - if source: - conditions.append("source = :source") - params["source"] = source - if data_type: - conditions.append("data_type = :data_type") - params["data_type"] = data_type - if country: - conditions.append("country = :country") - params["country"] = country - if search: - conditions.append("(name ILIKE :search OR title ILIKE :search)") - params["search"] = f"%{search}%" + if mode != "history": + conditions.append("COALESCE(is_current, TRUE) = TRUE") + + if source_values: + conditions.append(build_in_condition("source", source_values, "source", params)) + if data_type_values: + conditions.append(build_in_condition("data_type", data_type_values, "data_type", params)) + if normalized_country: + conditions.append(f"{COUNTRY_SQL} = :country") + params["country"] = normalized_country + search_condition = build_search_condition(search, params) + if search_condition: + conditions.append(search_condition) where_sql = " AND ".join(conditions) if conditions else "1=1" + search_rank_sql = build_search_rank_sql(search) # Calculate offset offset = (page - 1) * page_size @@ -58,11 +176,11 @@ async def list_collected_data( # Query data query = text(f""" SELECT id, source, source_id, data_type, name, title, description, - country, city, latitude, longitude, value, unit, - metadata, collected_at, reference_date, is_valid + metadata, collected_at, reference_date, is_valid, + {search_rank_sql} AS search_rank FROM collected_data WHERE {where_sql} - ORDER BY collected_at DESC + ORDER BY search_rank DESC, collected_at DESC LIMIT :limit OFFSET :offset """) params["limit"] = page_size @@ -73,27 +191,7 @@ async def list_collected_data( data = [] for row in rows: - data.append( - { - "id": row[0], - "source": row[1], - "source_id": row[2], - "data_type": row[3], - "name": row[4], - "title": row[5], - "description": row[6], - "country": row[7], - "city": row[8], - "latitude": row[9], - "longitude": row[10], - "value": row[11], - "unit": row[12], - "metadata": row[13], - "collected_at": row[14].isoformat() if row[14] else None, - "reference_date": row[15].isoformat() if row[15] else None, - "is_valid": row[16], - } - ) + data.append(serialize_collected_row(row[:11])) return { "total": total, @@ -105,16 +203,19 @@ async def list_collected_data( @router.get("/summary") async def get_data_summary( + mode: str = Query("current", description="查询模式: current/history"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取数据汇总统计""" + where_sql = "WHERE COALESCE(is_current, TRUE) = TRUE" if mode != "history" else "" # By source and data_type result = await db.execute( text(""" SELECT source, data_type, COUNT(*) as count FROM collected_data + """ + where_sql + """ GROUP BY source, data_type ORDER BY source, data_type """) @@ -138,6 +239,7 @@ async def get_data_summary( text(""" SELECT source, COUNT(*) as count FROM collected_data + """ + where_sql + """ GROUP BY source ORDER BY count DESC """) @@ -153,6 +255,7 @@ async def get_data_summary( @router.get("/sources") async def get_data_sources( + mode: str = Query("current", description="查询模式: current/history"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): @@ -160,7 +263,9 @@ async def get_data_sources( result = await db.execute( text(""" - SELECT DISTINCT source FROM collected_data ORDER BY source + SELECT DISTINCT source FROM collected_data + """ + ("WHERE COALESCE(is_current, TRUE) = TRUE " if mode != "history" else "") + """ + ORDER BY source """) ) rows = result.fetchall() @@ -172,6 +277,7 @@ async def get_data_sources( @router.get("/types") async def get_data_types( + mode: str = Query("current", description="查询模式: current/history"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): @@ -179,7 +285,9 @@ async def get_data_types( result = await db.execute( text(""" - SELECT DISTINCT data_type FROM collected_data ORDER BY data_type + SELECT DISTINCT data_type FROM collected_data + """ + ("WHERE COALESCE(is_current, TRUE) = TRUE " if mode != "history" else "") + """ + ORDER BY data_type """) ) rows = result.fetchall() @@ -196,17 +304,8 @@ async def get_countries( ): """获取所有国家列表""" - result = await db.execute( - text(""" - SELECT DISTINCT country FROM collected_data - WHERE country IS NOT NULL AND country != '' - ORDER BY country - """) - ) - rows = result.fetchall() - return { - "countries": [row[0] for row in rows], + "countries": COUNTRY_OPTIONS, } @@ -221,7 +320,6 @@ async def get_collected_data( result = await db.execute( text(""" SELECT id, source, source_id, data_type, name, title, description, - country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE id = :id @@ -236,25 +334,7 @@ async def get_collected_data( detail="数据不存在", ) - return { - "id": row[0], - "source": row[1], - "source_id": row[2], - "data_type": row[3], - "name": row[4], - "title": row[5], - "description": row[6], - "country": row[7], - "city": row[8], - "latitude": row[9], - "longitude": row[10], - "value": row[11], - "unit": row[12], - "metadata": row[13], - "collected_at": row[14].isoformat() if row[14] else None, - "reference_date": row[15].isoformat() if row[15] else None, - "is_valid": row[16], - } + return serialize_collected_row(row) def build_where_clause( @@ -263,19 +343,21 @@ def build_where_clause( """Build WHERE clause and params for queries""" conditions = [] params = {} + source_values = parse_multi_values(source) + data_type_values = parse_multi_values(data_type) - if source: - conditions.append("source = :source") - params["source"] = source - if data_type: - conditions.append("data_type = :data_type") - params["data_type"] = data_type - if country: - conditions.append("country = :country") - params["country"] = country - if search: - conditions.append("(name ILIKE :search OR title ILIKE :search)") - params["search"] = f"%{search}%" + if source_values: + conditions.append(build_in_condition("source", source_values, "source", params)) + if data_type_values: + conditions.append(build_in_condition("data_type", data_type_values, "data_type", params)) + normalized_country = normalize_country(country) if country else None + + if normalized_country: + conditions.append(f"{COUNTRY_SQL} = :country") + params["country"] = normalized_country + search_condition = build_search_condition(search, params) + if search_condition: + conditions.append(search_condition) where_sql = " AND ".join(conditions) if conditions else "1=1" return where_sql, params @@ -283,6 +365,7 @@ def build_where_clause( @router.get("/export/json") async def export_json( + mode: str = Query("current", description="查询模式: current/history"), source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), @@ -294,11 +377,12 @@ async def export_json( """导出数据为 JSON 格式""" where_sql, params = build_where_clause(source, data_type, country, search) + if mode != "history": + where_sql = f"({where_sql}) AND COALESCE(is_current, TRUE) = TRUE" params["limit"] = limit query = text(f""" SELECT id, source, source_id, data_type, name, title, description, - country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE {where_sql} @@ -311,27 +395,7 @@ async def export_json( data = [] for row in rows: - data.append( - { - "id": row[0], - "source": row[1], - "source_id": row[2], - "data_type": row[3], - "name": row[4], - "title": row[5], - "description": row[6], - "country": row[7], - "city": row[8], - "latitude": row[9], - "longitude": row[10], - "value": row[11], - "unit": row[12], - "metadata": row[13], - "collected_at": row[14].isoformat() if row[14] else None, - "reference_date": row[15].isoformat() if row[15] else None, - "is_valid": row[16], - } - ) + data.append(serialize_collected_row(row)) json_str = json.dumps({"data": data, "total": len(data)}, ensure_ascii=False, indent=2) @@ -346,6 +410,7 @@ async def export_json( @router.get("/export/csv") async def export_csv( + mode: str = Query("current", description="查询模式: current/history"), source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), @@ -357,11 +422,12 @@ async def export_csv( """导出数据为 CSV 格式""" where_sql, params = build_where_clause(source, data_type, country, search) + if mode != "history": + where_sql = f"({where_sql}) AND COALESCE(is_current, TRUE) = TRUE" params["limit"] = limit query = text(f""" SELECT id, source, source_id, data_type, name, title, description, - country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE {where_sql} @@ -409,16 +475,16 @@ async def export_csv( row[4], row[5], row[6], - row[7], - row[8], - row[9], + get_metadata_field(row[7], "country"), + get_metadata_field(row[7], "city"), + get_metadata_field(row[7], "latitude"), + get_metadata_field(row[7], "longitude"), + get_metadata_field(row[7], "value"), + get_metadata_field(row[7], "unit"), + json.dumps(row[7]) if row[7] else "", + row[8].isoformat() if row[8] else "", + row[9].isoformat() if row[9] else "", row[10], - row[11], - row[12], - json.dumps(row[13]) if row[13] else "", - row[14].isoformat() if row[14] else "", - row[15].isoformat() if row[15] else "", - row[16], ] ) diff --git a/backend/app/api/v1/datasources.py b/backend/app/api/v1/datasources.py index d4b73889..be4e4543 100644 --- a/backend/app/api/v1/datasources.py +++ b/backend/app/api/v1/datasources.py @@ -5,12 +5,13 @@ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import get_current_user +from app.core.data_sources import get_data_sources_config from app.db.session import get_db from app.models.collected_data import CollectedData from app.models.datasource import DataSource from app.models.task import CollectionTask from app.models.user import User -from app.services.scheduler import run_collector_now, sync_datasource_job +from app.services.scheduler import get_latest_task_id_for_datasource, run_collector_now, sync_datasource_job router = APIRouter() @@ -83,9 +84,11 @@ async def list_datasources( datasources = result.scalars().all() collector_list = [] + config = get_data_sources_config() for datasource in datasources: running_task = await get_running_task(db, datasource.id) last_task = await get_last_completed_task(db, datasource.id) + endpoint = await config.get_url(datasource.source, db) data_count_result = await db.execute( select(func.count(CollectedData.id)).where(CollectedData.source == datasource.source) ) @@ -105,10 +108,12 @@ async def list_datasources( "frequency_minutes": datasource.frequency_minutes, "is_active": datasource.is_active, "collector_class": datasource.collector_class, + "endpoint": endpoint, "last_run": last_run, "is_running": running_task is not None, "task_id": running_task.id if running_task else None, "progress": running_task.progress if running_task else None, + "phase": running_task.phase if running_task else None, "records_processed": running_task.records_processed if running_task else None, "total_records": running_task.total_records if running_task else None, } @@ -127,6 +132,9 @@ async def get_datasource( if not datasource: raise HTTPException(status_code=404, detail="Data source not found") + config = get_data_sources_config() + endpoint = await config.get_url(datasource.source, db) + return { "id": datasource.id, "name": datasource.name, @@ -136,6 +144,7 @@ async def get_datasource( "frequency_minutes": datasource.frequency_minutes, "collector_class": datasource.collector_class, "source": datasource.source, + "endpoint": endpoint, "is_active": datasource.is_active, } @@ -212,9 +221,16 @@ async def trigger_datasource( if not success: raise HTTPException(status_code=500, detail=f"Failed to trigger collector '{datasource.source}'") + task_id = None + for _ in range(10): + task_id = await get_latest_task_id_for_datasource(datasource.id) + if task_id is not None: + break + return { "status": "triggered", "source_id": datasource.id, + "task_id": task_id, "collector_name": datasource.source, "message": f"Collector '{datasource.source}' has been triggered", } @@ -252,21 +268,29 @@ async def clear_datasource_data( @router.get("/{source_id}/task-status") async def get_task_status( source_id: str, + task_id: Optional[int] = None, db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") - running_task = await get_running_task(db, datasource.id) - if not running_task: - return {"is_running": False, "task_id": None, "progress": None} + if task_id is not None: + task = await db.get(CollectionTask, task_id) + if not task or task.datasource_id != datasource.id: + raise HTTPException(status_code=404, detail="Task not found") + else: + task = await get_running_task(db, datasource.id) + + if not task: + return {"is_running": False, "task_id": None, "progress": None, "phase": None, "status": "idle"} return { - "is_running": True, - "task_id": running_task.id, - "progress": running_task.progress, - "records_processed": running_task.records_processed, - "total_records": running_task.total_records, - "status": running_task.status, - } + "is_running": task.status == "running", + "task_id": task.id, + "progress": task.progress, + "phase": task.phase, + "records_processed": task.records_processed, + "total_records": task.total_records, + "status": task.status, + } diff --git a/backend/app/api/v1/visualization.py b/backend/app/api/v1/visualization.py index 39da00c7..c3e295e6 100644 --- a/backend/app/api/v1/visualization.py +++ b/backend/app/api/v1/visualization.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from typing import List, Dict, Any, Optional +from app.core.collected_data_fields import get_record_field from app.db.session import get_db from app.models.collected_data import CollectedData from app.services.cable_graph import build_graph_from_data, CableGraph @@ -83,9 +84,9 @@ def convert_cable_to_geojson(records: List[CollectedData]) -> Dict[str, Any]: "rfs": metadata.get("rfs"), "RFS": metadata.get("rfs"), "status": metadata.get("status", "active"), - "length": record.value, - "length_km": record.value, - "SHAPE__Length": record.value, + "length": get_record_field(record, "value"), + "length_km": get_record_field(record, "value"), + "SHAPE__Length": get_record_field(record, "value"), "url": metadata.get("url"), "color": metadata.get("color"), "year": metadata.get("year"), @@ -101,8 +102,10 @@ def convert_landing_point_to_geojson(records: List[CollectedData], city_to_cable for record in records: try: - lat = float(record.latitude) if record.latitude else None - lon = float(record.longitude) if record.longitude else None + latitude = get_record_field(record, "latitude") + longitude = get_record_field(record, "longitude") + lat = float(latitude) if latitude else None + lon = float(longitude) if longitude else None except (ValueError, TypeError): continue @@ -116,8 +119,8 @@ def convert_landing_point_to_geojson(records: List[CollectedData], city_to_cable "id": record.id, "source_id": record.source_id, "name": record.name, - "country": record.country, - "city": record.city, + "country": get_record_field(record, "country"), + "city": get_record_field(record, "city"), "is_tbd": metadata.get("is_tbd", False), } @@ -185,9 +188,11 @@ def convert_supercomputer_to_geojson(records: List[CollectedData]) -> Dict[str, for record in records: try: - lat = float(record.latitude) if record.latitude and record.latitude != "0.0" else None + latitude = get_record_field(record, "latitude") + longitude = get_record_field(record, "longitude") + lat = float(latitude) if latitude and latitude != "0.0" else None lon = ( - float(record.longitude) if record.longitude and record.longitude != "0.0" else None + float(longitude) if longitude and longitude != "0.0" else None ) except (ValueError, TypeError): lat, lon = None, None @@ -203,12 +208,12 @@ def convert_supercomputer_to_geojson(records: List[CollectedData]) -> Dict[str, "id": record.id, "name": record.name, "rank": metadata.get("rank"), - "r_max": record.value, - "r_peak": metadata.get("r_peak"), - "cores": metadata.get("cores"), - "power": metadata.get("power"), - "country": record.country, - "city": record.city, + "r_max": get_record_field(record, "rmax"), + "r_peak": get_record_field(record, "rpeak"), + "cores": get_record_field(record, "cores"), + "power": get_record_field(record, "power"), + "country": get_record_field(record, "country"), + "city": get_record_field(record, "city"), "data_type": "supercomputer", }, } @@ -223,8 +228,10 @@ def convert_gpu_cluster_to_geojson(records: List[CollectedData]) -> Dict[str, An for record in records: try: - lat = float(record.latitude) if record.latitude else None - lon = float(record.longitude) if record.longitude else None + latitude = get_record_field(record, "latitude") + longitude = get_record_field(record, "longitude") + lat = float(latitude) if latitude else None + lon = float(longitude) if longitude else None except (ValueError, TypeError): lat, lon = None, None @@ -238,8 +245,8 @@ def convert_gpu_cluster_to_geojson(records: List[CollectedData]) -> Dict[str, An "properties": { "id": record.id, "name": record.name, - "country": record.country, - "city": record.city, + "country": get_record_field(record, "country"), + "city": get_record_field(record, "city"), "metadata": metadata, "data_type": "gpu_cluster", }, diff --git a/backend/app/core/collected_data_fields.py b/backend/app/core/collected_data_fields.py new file mode 100644 index 00000000..5574605c --- /dev/null +++ b/backend/app/core/collected_data_fields.py @@ -0,0 +1,62 @@ +from typing import Any, Dict, Optional + + +FIELD_ALIASES = { + "country": ("country",), + "city": ("city",), + "latitude": ("latitude",), + "longitude": ("longitude",), + "value": ("value",), + "unit": ("unit",), + "cores": ("cores",), + "rmax": ("rmax", "r_max"), + "rpeak": ("rpeak", "r_peak"), + "power": ("power",), +} + + +def get_metadata_field(metadata: Optional[Dict[str, Any]], field: str, fallback: Any = None) -> Any: + if isinstance(metadata, dict): + for key in FIELD_ALIASES.get(field, (field,)): + value = metadata.get(key) + if value not in (None, ""): + return value + return fallback + + +def build_dynamic_metadata( + metadata: Optional[Dict[str, Any]], + *, + country: Any = None, + city: Any = None, + latitude: Any = None, + longitude: Any = None, + value: Any = None, + unit: Any = None, +) -> Dict[str, Any]: + merged = dict(metadata) if isinstance(metadata, dict) else {} + + fallbacks = { + "country": country, + "city": city, + "latitude": latitude, + "longitude": longitude, + "value": value, + "unit": unit, + } + + for field, fallback in fallbacks.items(): + if fallback not in (None, "") and get_metadata_field(merged, field) in (None, ""): + merged[field] = fallback + + return merged + + +def get_record_field(record: Any, field: str) -> Any: + metadata = getattr(record, "extra_data", None) or {} + fallback_attr = field + if field in {"cores", "rmax", "rpeak", "power"}: + fallback = None + else: + fallback = getattr(record, fallback_attr, None) + return get_metadata_field(metadata, field, fallback=fallback) diff --git a/backend/app/core/countries.py b/backend/app/core/countries.py new file mode 100644 index 00000000..b1e8bc3c --- /dev/null +++ b/backend/app/core/countries.py @@ -0,0 +1,280 @@ +import re +from typing import Any, Optional + + +COUNTRY_ENTRIES = [ + ("阿富汗", ["Afghanistan", "AF", "AFG"]), + ("阿尔巴尼亚", ["Albania", "AL", "ALB"]), + ("阿尔及利亚", ["Algeria", "DZ", "DZA"]), + ("安道尔", ["Andorra", "AD", "AND"]), + ("安哥拉", ["Angola", "AO", "AGO"]), + ("安提瓜和巴布达", ["Antigua and Barbuda", "AG", "ATG"]), + ("阿根廷", ["Argentina", "AR", "ARG"]), + ("亚美尼亚", ["Armenia", "AM", "ARM"]), + ("澳大利亚", ["Australia", "AU", "AUS"]), + ("奥地利", ["Austria", "AT", "AUT"]), + ("阿塞拜疆", ["Azerbaijan", "AZ", "AZE"]), + ("巴哈马", ["Bahamas", "BS", "BHS"]), + ("巴林", ["Bahrain", "BH", "BHR"]), + ("孟加拉国", ["Bangladesh", "BD", "BGD"]), + ("巴巴多斯", ["Barbados", "BB", "BRB"]), + ("白俄罗斯", ["Belarus", "BY", "BLR"]), + ("比利时", ["Belgium", "BE", "BEL"]), + ("伯利兹", ["Belize", "BZ", "BLZ"]), + ("贝宁", ["Benin", "BJ", "BEN"]), + ("不丹", ["Bhutan", "BT", "BTN"]), + ("玻利维亚", ["Bolivia", "BO", "BOL", "Bolivia (Plurinational State of)"]), + ("波斯尼亚和黑塞哥维那", ["Bosnia and Herzegovina", "BA", "BIH"]), + ("博茨瓦纳", ["Botswana", "BW", "BWA"]), + ("巴西", ["Brazil", "BR", "BRA"]), + ("文莱", ["Brunei", "BN", "BRN", "Brunei Darussalam"]), + ("保加利亚", ["Bulgaria", "BG", "BGR"]), + ("布基纳法索", ["Burkina Faso", "BF", "BFA"]), + ("布隆迪", ["Burundi", "BI", "BDI"]), + ("柬埔寨", ["Cambodia", "KH", "KHM"]), + ("喀麦隆", ["Cameroon", "CM", "CMR"]), + ("加拿大", ["Canada", "CA", "CAN"]), + ("佛得角", ["Cape Verde", "CV", "CPV", "Cabo Verde"]), + ("中非", ["Central African Republic", "CF", "CAF"]), + ("乍得", ["Chad", "TD", "TCD"]), + ("智利", ["Chile", "CL", "CHL"]), + ("中国", ["China", "CN", "CHN", "Mainland China", "PRC", "People's Republic of China"]), + ("中国(香港)", ["Hong Kong", "HK", "HKG", "Hong Kong SAR", "China Hong Kong", "Hong Kong, China"]), + ("中国(澳门)", ["Macao", "Macau", "MO", "MAC", "Macao SAR", "China Macao", "Macau, China"]), + ("中国(台湾)", ["Taiwan", "TW", "TWN", "Chinese Taipei", "Taiwan, China"]), + ("哥伦比亚", ["Colombia", "CO", "COL"]), + ("科摩罗", ["Comoros", "KM", "COM"]), + ("刚果(布)", ["Republic of the Congo", "Congo", "Congo-Brazzaville", "CG", "COG"]), + ("刚果(金)", ["Democratic Republic of the Congo", "DR Congo", "Congo-Kinshasa", "CD", "COD"]), + ("哥斯达黎加", ["Costa Rica", "CR", "CRI"]), + ("科特迪瓦", ["Cote d'Ivoire", "Côte d'Ivoire", "Ivory Coast", "CI", "CIV"]), + ("克罗地亚", ["Croatia", "HR", "HRV"]), + ("古巴", ["Cuba", "CU", "CUB"]), + ("塞浦路斯", ["Cyprus", "CY", "CYP"]), + ("捷克", ["Czech Republic", "Czechia", "CZ", "CZE"]), + ("丹麦", ["Denmark", "DK", "DNK"]), + ("吉布提", ["Djibouti", "DJ", "DJI"]), + ("多米尼克", ["Dominica", "DM", "DMA"]), + ("多米尼加", ["Dominican Republic", "DO", "DOM"]), + ("厄瓜多尔", ["Ecuador", "EC", "ECU"]), + ("埃及", ["Egypt", "EG", "EGY"]), + ("萨尔瓦多", ["El Salvador", "SV", "SLV"]), + ("赤道几内亚", ["Equatorial Guinea", "GQ", "GNQ"]), + ("厄立特里亚", ["Eritrea", "ER", "ERI"]), + ("爱沙尼亚", ["Estonia", "EE", "EST"]), + ("埃斯瓦蒂尼", ["Eswatini", "SZ", "SWZ", "Swaziland"]), + ("埃塞俄比亚", ["Ethiopia", "ET", "ETH"]), + ("斐济", ["Fiji", "FJ", "FJI"]), + ("芬兰", ["Finland", "FI", "FIN"]), + ("法国", ["France", "FR", "FRA"]), + ("加蓬", ["Gabon", "GA", "GAB"]), + ("冈比亚", ["Gambia", "GM", "GMB"]), + ("格鲁吉亚", ["Georgia", "GE", "GEO"]), + ("德国", ["Germany", "DE", "DEU"]), + ("加纳", ["Ghana", "GH", "GHA"]), + ("希腊", ["Greece", "GR", "GRC"]), + ("格林纳达", ["Grenada", "GD", "GRD"]), + ("危地马拉", ["Guatemala", "GT", "GTM"]), + ("几内亚", ["Guinea", "GN", "GIN"]), + ("几内亚比绍", ["Guinea-Bissau", "GW", "GNB"]), + ("圭亚那", ["Guyana", "GY", "GUY"]), + ("海地", ["Haiti", "HT", "HTI"]), + ("洪都拉斯", ["Honduras", "HN", "HND"]), + ("匈牙利", ["Hungary", "HU", "HUN"]), + ("冰岛", ["Iceland", "IS", "ISL"]), + ("印度", ["India", "IN", "IND"]), + ("印度尼西亚", ["Indonesia", "ID", "IDN"]), + ("伊朗", ["Iran", "IR", "IRN", "Iran (Islamic Republic of)"]), + ("伊拉克", ["Iraq", "IQ", "IRQ"]), + ("爱尔兰", ["Ireland", "IE", "IRL"]), + ("以色列", ["Israel", "IL", "ISR"]), + ("意大利", ["Italy", "IT", "ITA"]), + ("牙买加", ["Jamaica", "JM", "JAM"]), + ("日本", ["Japan", "JP", "JPN"]), + ("约旦", ["Jordan", "JO", "JOR"]), + ("哈萨克斯坦", ["Kazakhstan", "KZ", "KAZ"]), + ("肯尼亚", ["Kenya", "KE", "KEN"]), + ("基里巴斯", ["Kiribati", "KI", "KIR"]), + ("朝鲜", ["North Korea", "Korea, DPRK", "Democratic People's Republic of Korea", "KP", "PRK"]), + ("韩国", ["South Korea", "Republic of Korea", "Korea", "KR", "KOR"]), + ("科威特", ["Kuwait", "KW", "KWT"]), + ("吉尔吉斯斯坦", ["Kyrgyzstan", "KG", "KGZ"]), + ("老挝", ["Laos", "Lao PDR", "Lao People's Democratic Republic", "LA", "LAO"]), + ("拉脱维亚", ["Latvia", "LV", "LVA"]), + ("黎巴嫩", ["Lebanon", "LB", "LBN"]), + ("莱索托", ["Lesotho", "LS", "LSO"]), + ("利比里亚", ["Liberia", "LR", "LBR"]), + ("利比亚", ["Libya", "LY", "LBY"]), + ("列支敦士登", ["Liechtenstein", "LI", "LIE"]), + ("立陶宛", ["Lithuania", "LT", "LTU"]), + ("卢森堡", ["Luxembourg", "LU", "LUX"]), + ("马达加斯加", ["Madagascar", "MG", "MDG"]), + ("马拉维", ["Malawi", "MW", "MWI"]), + ("马来西亚", ["Malaysia", "MY", "MYS"]), + ("马尔代夫", ["Maldives", "MV", "MDV"]), + ("马里", ["Mali", "ML", "MLI"]), + ("马耳他", ["Malta", "MT", "MLT"]), + ("马绍尔群岛", ["Marshall Islands", "MH", "MHL"]), + ("毛里塔尼亚", ["Mauritania", "MR", "MRT"]), + ("毛里求斯", ["Mauritius", "MU", "MUS"]), + ("墨西哥", ["Mexico", "MX", "MEX"]), + ("密克罗尼西亚", ["Micronesia", "FM", "FSM", "Federated States of Micronesia"]), + ("摩尔多瓦", ["Moldova", "MD", "MDA", "Republic of Moldova"]), + ("摩纳哥", ["Monaco", "MC", "MCO"]), + ("蒙古", ["Mongolia", "MN", "MNG"]), + ("黑山", ["Montenegro", "ME", "MNE"]), + ("摩洛哥", ["Morocco", "MA", "MAR"]), + ("莫桑比克", ["Mozambique", "MZ", "MOZ"]), + ("缅甸", ["Myanmar", "MM", "MMR", "Burma"]), + ("纳米比亚", ["Namibia", "NA", "NAM"]), + ("瑙鲁", ["Nauru", "NR", "NRU"]), + ("尼泊尔", ["Nepal", "NP", "NPL"]), + ("荷兰", ["Netherlands", "NL", "NLD"]), + ("新西兰", ["New Zealand", "NZ", "NZL"]), + ("尼加拉瓜", ["Nicaragua", "NI", "NIC"]), + ("尼日尔", ["Niger", "NE", "NER"]), + ("尼日利亚", ["Nigeria", "NG", "NGA"]), + ("北马其顿", ["North Macedonia", "MK", "MKD", "Macedonia"]), + ("挪威", ["Norway", "NO", "NOR"]), + ("阿曼", ["Oman", "OM", "OMN"]), + ("巴基斯坦", ["Pakistan", "PK", "PAK"]), + ("帕劳", ["Palau", "PW", "PLW"]), + ("巴勒斯坦", ["Palestine", "PS", "PSE", "State of Palestine"]), + ("巴拿马", ["Panama", "PA", "PAN"]), + ("巴布亚新几内亚", ["Papua New Guinea", "PG", "PNG"]), + ("巴拉圭", ["Paraguay", "PY", "PRY"]), + ("秘鲁", ["Peru", "PE", "PER"]), + ("菲律宾", ["Philippines", "PH", "PHL"]), + ("波兰", ["Poland", "PL", "POL"]), + ("葡萄牙", ["Portugal", "PT", "PRT"]), + ("卡塔尔", ["Qatar", "QA", "QAT"]), + ("罗马尼亚", ["Romania", "RO", "ROU"]), + ("俄罗斯", ["Russia", "Russian Federation", "RU", "RUS"]), + ("卢旺达", ["Rwanda", "RW", "RWA"]), + ("圣基茨和尼维斯", ["Saint Kitts and Nevis", "KN", "KNA"]), + ("圣卢西亚", ["Saint Lucia", "LC", "LCA"]), + ("圣文森特和格林纳丁斯", ["Saint Vincent and the Grenadines", "VC", "VCT"]), + ("萨摩亚", ["Samoa", "WS", "WSM"]), + ("圣马力诺", ["San Marino", "SM", "SMR"]), + ("圣多美和普林西比", ["Sao Tome and Principe", "ST", "STP", "São Tomé and Príncipe"]), + ("沙特阿拉伯", ["Saudi Arabia", "SA", "SAU"]), + ("塞内加尔", ["Senegal", "SN", "SEN"]), + ("塞尔维亚", ["Serbia", "RS", "SRB", "Kosovo", "XK", "XKS", "Republic of Kosovo"]), + ("塞舌尔", ["Seychelles", "SC", "SYC"]), + ("塞拉利昂", ["Sierra Leone", "SL", "SLE"]), + ("新加坡", ["Singapore", "SG", "SGP"]), + ("斯洛伐克", ["Slovakia", "SK", "SVK"]), + ("斯洛文尼亚", ["Slovenia", "SI", "SVN"]), + ("所罗门群岛", ["Solomon Islands", "SB", "SLB"]), + ("索马里", ["Somalia", "SO", "SOM"]), + ("南非", ["South Africa", "ZA", "ZAF"]), + ("南苏丹", ["South Sudan", "SS", "SSD"]), + ("西班牙", ["Spain", "ES", "ESP"]), + ("斯里兰卡", ["Sri Lanka", "LK", "LKA"]), + ("苏丹", ["Sudan", "SD", "SDN"]), + ("苏里南", ["Suriname", "SR", "SUR"]), + ("瑞典", ["Sweden", "SE", "SWE"]), + ("瑞士", ["Switzerland", "CH", "CHE"]), + ("叙利亚", ["Syria", "SY", "SYR", "Syrian Arab Republic"]), + ("塔吉克斯坦", ["Tajikistan", "TJ", "TJK"]), + ("坦桑尼亚", ["Tanzania", "TZ", "TZA", "United Republic of Tanzania"]), + ("泰国", ["Thailand", "TH", "THA"]), + ("东帝汶", ["Timor-Leste", "East Timor", "TL", "TLS"]), + ("多哥", ["Togo", "TG", "TGO"]), + ("汤加", ["Tonga", "TO", "TON"]), + ("特立尼达和多巴哥", ["Trinidad and Tobago", "TT", "TTO"]), + ("突尼斯", ["Tunisia", "TN", "TUN"]), + ("土耳其", ["Turkey", "TR", "TUR", "Türkiye"]), + ("土库曼斯坦", ["Turkmenistan", "TM", "TKM"]), + ("图瓦卢", ["Tuvalu", "TV", "TUV"]), + ("乌干达", ["Uganda", "UG", "UGA"]), + ("乌克兰", ["Ukraine", "UA", "UKR"]), + ("阿联酋", ["United Arab Emirates", "AE", "ARE", "UAE"]), + ("英国", ["United Kingdom", "UK", "GB", "GBR", "Great Britain", "Britain", "England"]), + ("美国", ["United States", "United States of America", "US", "USA", "U.S.", "U.S.A."]), + ("乌拉圭", ["Uruguay", "UY", "URY"]), + ("乌兹别克斯坦", ["Uzbekistan", "UZ", "UZB"]), + ("瓦努阿图", ["Vanuatu", "VU", "VUT"]), + ("梵蒂冈", ["Vatican City", "Holy See", "VA", "VAT"]), + ("委内瑞拉", ["Venezuela", "VE", "VEN", "Venezuela (Bolivarian Republic of)"]), + ("越南", ["Vietnam", "Viet Nam", "VN", "VNM"]), + ("也门", ["Yemen", "YE", "YEM"]), + ("赞比亚", ["Zambia", "ZM", "ZMB"]), + ("津巴布韦", ["Zimbabwe", "ZW", "ZWE"]), +] + + +COUNTRY_OPTIONS = [entry[0] for entry in COUNTRY_ENTRIES] +CANONICAL_COUNTRY_SET = set(COUNTRY_OPTIONS) +INVALID_COUNTRY_VALUES = { + "", + "-", + "--", + "unknown", + "n/a", + "na", + "none", + "null", + "global", + "world", + "worldwide", + "xx", +} +NUMERIC_LIKE_PATTERN = re.compile(r"^[\d\s,._%+\-]+$") + +COUNTRY_ALIAS_MAP = {} +COUNTRY_VARIANTS_MAP = {} +for canonical, aliases in COUNTRY_ENTRIES: + COUNTRY_ALIAS_MAP[canonical.casefold()] = canonical + variants = [canonical, *aliases] + COUNTRY_VARIANTS_MAP[canonical] = variants + for alias in aliases: + COUNTRY_ALIAS_MAP[alias.casefold()] = canonical + + +def normalize_country(value: Any) -> Optional[str]: + if value is None: + return None + + if not isinstance(value, str): + return None + + normalized = re.sub(r"\s+", " ", value.strip()) + normalized = normalized.replace("(", "(").replace(")", ")") + + if not normalized: + return None + + lowered = normalized.casefold() + if lowered in INVALID_COUNTRY_VALUES: + return None + + if NUMERIC_LIKE_PATTERN.fullmatch(normalized): + return None + + if normalized in CANONICAL_COUNTRY_SET: + return normalized + + return COUNTRY_ALIAS_MAP.get(lowered) + + +def get_country_search_variants(value: Any) -> list[str]: + canonical = normalize_country(value) + if canonical is None: + return [] + + variants = [] + seen = set() + for item in COUNTRY_VARIANTS_MAP.get(canonical, [canonical]): + if not isinstance(item, str): + continue + normalized = re.sub(r"\s+", " ", item.strip()) + if not normalized: + continue + key = normalized.casefold() + if key in seen: + continue + seen.add(key) + variants.append(normalized) + + return variants diff --git a/backend/app/db/session.py b/backend/app/db/session.py index 392ca380..4d3ccf69 100644 --- a/backend/app/db/session.py +++ b/backend/app/db/session.py @@ -1,5 +1,6 @@ from typing import AsyncGenerator +from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import declarative_base @@ -63,6 +64,7 @@ async def init_db(): import app.models.user # noqa: F401 import app.models.gpu_cluster # noqa: F401 import app.models.task # noqa: F401 + import app.models.data_snapshot # noqa: F401 import app.models.datasource # noqa: F401 import app.models.datasource_config # noqa: F401 import app.models.alert # noqa: F401 @@ -71,6 +73,55 @@ async def init_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) + await conn.execute( + text( + """ + ALTER TABLE collected_data + ADD COLUMN IF NOT EXISTS snapshot_id INTEGER, + ADD COLUMN IF NOT EXISTS task_id INTEGER, + ADD COLUMN IF NOT EXISTS entity_key VARCHAR(255), + ADD COLUMN IF NOT EXISTS is_current BOOLEAN DEFAULT TRUE, + ADD COLUMN IF NOT EXISTS previous_record_id INTEGER, + ADD COLUMN IF NOT EXISTS change_type VARCHAR(20), + ADD COLUMN IF NOT EXISTS change_summary JSONB DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ + """ + ) + ) + await conn.execute( + text( + """ + ALTER TABLE collection_tasks + ADD COLUMN IF NOT EXISTS phase VARCHAR(30) DEFAULT 'queued' + """ + ) + ) + await conn.execute( + text( + """ + CREATE INDEX IF NOT EXISTS idx_collected_data_source_source_id + ON collected_data (source, source_id) + """ + ) + ) + await conn.execute( + text( + """ + UPDATE collected_data + SET entity_key = source || ':' || COALESCE(source_id, id::text) + WHERE entity_key IS NULL + """ + ) + ) + await conn.execute( + text( + """ + UPDATE collected_data + SET is_current = TRUE + WHERE is_current IS NULL + """ + ) + ) async with async_session_factory() as session: - await seed_default_datasources(session) + await seed_default_datasources(session) diff --git a/backend/app/main.py b/backend/app/main.py index 3bd9aa02..a65b77b1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,12 @@ from app.api.v1 import websocket from app.core.config import settings from app.core.websocket.broadcaster import broadcaster from app.db.session import init_db -from app.services.scheduler import start_scheduler, stop_scheduler, sync_scheduler_with_datasources +from app.services.scheduler import ( + cleanup_stale_running_tasks, + start_scheduler, + stop_scheduler, + sync_scheduler_with_datasources, +) class WebSocketCORSMiddleware(BaseHTTPMiddleware): @@ -26,6 +31,7 @@ class WebSocketCORSMiddleware(BaseHTTPMiddleware): @asynccontextmanager async def lifespan(app: FastAPI): await init_db() + await cleanup_stale_running_tasks() start_scheduler() await sync_scheduler_with_datasources() broadcaster.start() diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 16dc63d4..38e79102 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,6 +1,7 @@ from app.models.user import User from app.models.gpu_cluster import GPUCluster from app.models.task import CollectionTask +from app.models.data_snapshot import DataSnapshot from app.models.datasource import DataSource from app.models.datasource_config import DataSourceConfig from app.models.alert import Alert, AlertSeverity, AlertStatus @@ -10,6 +11,7 @@ __all__ = [ "User", "GPUCluster", "CollectionTask", + "DataSnapshot", "DataSource", "DataSourceConfig", "SystemSetting", diff --git a/backend/app/models/collected_data.py b/backend/app/models/collected_data.py index ef05658c..84791f15 100644 --- a/backend/app/models/collected_data.py +++ b/backend/app/models/collected_data.py @@ -1,8 +1,9 @@ """Collected Data model for storing data from all collectors""" -from sqlalchemy import Column, DateTime, Integer, String, Text, JSON, Index +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, Text, JSON, Index from sqlalchemy.sql import func +from app.core.collected_data_fields import get_record_field from app.db.session import Base @@ -12,8 +13,11 @@ class CollectedData(Base): __tablename__ = "collected_data" id = Column(Integer, primary_key=True, autoincrement=True) + snapshot_id = Column(Integer, ForeignKey("data_snapshots.id"), nullable=True, index=True) + task_id = Column(Integer, ForeignKey("collection_tasks.id"), nullable=True, index=True) source = Column(String(100), nullable=False, index=True) # e.g., "top500", "huggingface_models" source_id = Column(String(100), index=True) # Original ID from source, e.g., "rank_1" + entity_key = Column(String(255), index=True) data_type = Column( String(50), nullable=False, index=True ) # e.g., "supercomputer", "model", "dataset" @@ -23,16 +27,6 @@ class CollectedData(Base): title = Column(String(500)) description = Column(Text) - # Location data (for geo visualization) - country = Column(String(100)) - city = Column(String(100)) - latitude = Column(String(50)) - longitude = Column(String(50)) - - # Performance metrics - value = Column(String(100)) # Generic value field (Rmax, Rpeak, etc.) - unit = Column(String(20)) - # Additional metadata as JSON extra_data = Column( "metadata", JSON, default={} @@ -44,11 +38,17 @@ class CollectedData(Base): # Status is_valid = Column(Integer, default=1) # 1=valid, 0=invalid + is_current = Column(Boolean, default=True, index=True) + previous_record_id = Column(Integer, ForeignKey("collected_data.id"), nullable=True, index=True) + change_type = Column(String(20), nullable=True) + change_summary = Column(JSON, default={}) + deleted_at = Column(DateTime(timezone=True), nullable=True) # Indexes for common queries __table_args__ = ( Index("idx_collected_data_source_collected", "source", "collected_at"), Index("idx_collected_data_source_type", "source", "data_type"), + Index("idx_collected_data_source_source_id", "source", "source_id"), ) def __repr__(self): @@ -58,18 +58,21 @@ class CollectedData(Base): """Convert to dictionary""" return { "id": self.id, + "snapshot_id": self.snapshot_id, + "task_id": self.task_id, "source": self.source, "source_id": self.source_id, + "entity_key": self.entity_key, "data_type": self.data_type, "name": self.name, "title": self.title, "description": self.description, - "country": self.country, - "city": self.city, - "latitude": self.latitude, - "longitude": self.longitude, - "value": self.value, - "unit": self.unit, + "country": get_record_field(self, "country"), + "city": get_record_field(self, "city"), + "latitude": get_record_field(self, "latitude"), + "longitude": get_record_field(self, "longitude"), + "value": get_record_field(self, "value"), + "unit": get_record_field(self, "unit"), "metadata": self.extra_data, "collected_at": self.collected_at.isoformat() if self.collected_at is not None @@ -77,4 +80,9 @@ class CollectedData(Base): "reference_date": self.reference_date.isoformat() if self.reference_date is not None else None, + "is_current": self.is_current, + "previous_record_id": self.previous_record_id, + "change_type": self.change_type, + "change_summary": self.change_summary, + "deleted_at": self.deleted_at.isoformat() if self.deleted_at is not None else None, } diff --git a/backend/app/models/data_snapshot.py b/backend/app/models/data_snapshot.py new file mode 100644 index 00000000..f70b4f12 --- /dev/null +++ b/backend/app/models/data_snapshot.py @@ -0,0 +1,26 @@ +from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, JSON, String +from sqlalchemy.sql import func + +from app.db.session import Base + + +class DataSnapshot(Base): + __tablename__ = "data_snapshots" + + id = Column(Integer, primary_key=True, autoincrement=True) + datasource_id = Column(Integer, nullable=False, index=True) + task_id = Column(Integer, ForeignKey("collection_tasks.id"), nullable=True, index=True) + source = Column(String(100), nullable=False, index=True) + snapshot_key = Column(String(100), nullable=True, index=True) + reference_date = Column(DateTime(timezone=True), nullable=True) + started_at = Column(DateTime(timezone=True), server_default=func.now()) + completed_at = Column(DateTime(timezone=True), nullable=True) + record_count = Column(Integer, default=0) + status = Column(String(20), nullable=False, default="running") + is_current = Column(Boolean, default=True, index=True) + parent_snapshot_id = Column(Integer, ForeignKey("data_snapshots.id"), nullable=True, index=True) + summary = Column(JSON, default={}) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + def __repr__(self): + return f"" diff --git a/backend/app/models/task.py b/backend/app/models/task.py index c5509027..12d858c2 100644 --- a/backend/app/models/task.py +++ b/backend/app/models/task.py @@ -12,6 +12,7 @@ class CollectionTask(Base): id = Column(Integer, primary_key=True, autoincrement=True) datasource_id = Column(Integer, nullable=False, index=True) status = Column(String(20), nullable=False) # pending, running, success, failed, cancelled + phase = Column(String(30), default="queued") started_at = Column(DateTime(timezone=True)) completed_at = Column(DateTime(timezone=True)) records_processed = Column(Integer, default=0) diff --git a/backend/app/services/collectors/arcgis_relation.py b/backend/app/services/collectors/arcgis_relation.py index 8b98536f..d06a46c8 100644 --- a/backend/app/services/collectors/arcgis_relation.py +++ b/backend/app/services/collectors/arcgis_relation.py @@ -1,10 +1,11 @@ -from typing import Dict, Any, List +import asyncio from datetime import datetime +from typing import Any, Dict, List, Optional + import httpx -from app.services.collectors.base import BaseCollector from app.core.data_sources import get_data_sources_config - +from app.services.collectors.base import BaseCollector class ArcGISCableLandingRelationCollector(BaseCollector): @@ -18,45 +19,129 @@ class ArcGISCableLandingRelationCollector(BaseCollector): def base_url(self) -> str: if self._resolved_url: return self._resolved_url - from app.core.data_sources import get_data_sources_config - config = get_data_sources_config() return config.get_yaml_url("arcgis_cable_landing_relation") + def _layer_url(self, layer_id: int) -> str: + if "/FeatureServer/" not in self.base_url: + return self.base_url + prefix = self.base_url.split("/FeatureServer/")[0] + return f"{prefix}/FeatureServer/{layer_id}/query" + + async def _fetch_layer_attributes( + self, client: httpx.AsyncClient, layer_id: int + ) -> List[Dict[str, Any]]: + response = await client.get( + self._layer_url(layer_id), + params={ + "where": "1=1", + "outFields": "*", + "returnGeometry": "false", + "f": "json", + }, + ) + response.raise_for_status() + data = response.json() + return [feature.get("attributes", {}) for feature in data.get("features", [])] + + async def _fetch_relation_features(self, client: httpx.AsyncClient) -> List[Dict[str, Any]]: + response = await client.get( + self.base_url, + params={ + "where": "1=1", + "outFields": "*", + "returnGeometry": "true", + "f": "geojson", + }, + ) + response.raise_for_status() + data = response.json() + return data.get("features", []) + async def fetch(self) -> List[Dict[str, Any]]: - params = {"where": "1=1", "outFields": "*", "returnGeometry": "true", "f": "geojson"} - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.get(self.base_url, params=params) - response.raise_for_status() - return self.parse_response(response.json()) + relation_features, landing_rows, cable_rows = await asyncio.gather( + self._fetch_relation_features(client), + self._fetch_layer_attributes(client, 1), + self._fetch_layer_attributes(client, 2), + ) + return self.parse_response(relation_features, landing_rows, cable_rows) - def parse_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: - result = [] + def _build_landing_lookup(self, landing_rows: List[Dict[str, Any]]) -> Dict[int, Dict[str, Any]]: + lookup: Dict[int, Dict[str, Any]] = {} + for row in landing_rows: + city_id = row.get("city_id") + if city_id is None: + continue + lookup[int(city_id)] = { + "landing_point_id": row.get("landing_point_id") or city_id, + "landing_point_name": row.get("Name") or row.get("name") or "", + "facility": row.get("facility") or "", + "status": row.get("status") or "", + "country": row.get("country") or "", + } + return lookup - features = data.get("features", []) - for feature in features: + def _build_cable_lookup(self, cable_rows: List[Dict[str, Any]]) -> Dict[int, Dict[str, Any]]: + lookup: Dict[int, Dict[str, Any]] = {} + for row in cable_rows: + cable_id = row.get("cable_id") + if cable_id is None: + continue + lookup[int(cable_id)] = { + "cable_name": row.get("Name") or "", + "status": row.get("status") or "active", + } + return lookup + + def parse_response( + self, + relation_features: List[Dict[str, Any]], + landing_rows: List[Dict[str, Any]], + cable_rows: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + result: List[Dict[str, Any]] = [] + landing_lookup = self._build_landing_lookup(landing_rows) + cable_lookup = self._build_cable_lookup(cable_rows) + + for feature in relation_features: props = feature.get("properties", {}) try: + city_id = props.get("city_id") + cable_id = props.get("cable_id") + landing_info = landing_lookup.get(int(city_id), {}) if city_id is not None else {} + cable_info = cable_lookup.get(int(cable_id), {}) if cable_id is not None else {} + + cable_name = cable_info.get("cable_name") or props.get("cable_name") or "Unknown" + landing_point_name = ( + landing_info.get("landing_point_name") + or props.get("landing_point_name") + or "Unknown" + ) + facility = landing_info.get("facility") or props.get("facility") or "-" + status = cable_info.get("status") or landing_info.get("status") or props.get("status") or "-" + country = landing_info.get("country") or props.get("country") or "" + landing_point_id = landing_info.get("landing_point_id") or props.get("landing_point_id") or city_id + entry = { "source_id": f"arcgis_relation_{props.get('OBJECTID', props.get('id', ''))}", - "name": f"{props.get('cable_name', 'Unknown')} - {props.get('landing_point_name', 'Unknown')}", - "country": props.get("country", ""), - "city": props.get("landing_point_name", ""), + "name": f"{cable_name} - {landing_point_name}", + "country": country, + "city": landing_point_name, "latitude": str(props.get("latitude", "")) if props.get("latitude") else "", "longitude": str(props.get("longitude", "")) if props.get("longitude") else "", "value": "", "unit": "", "metadata": { "objectid": props.get("OBJECTID"), - "city_id": props.get("city_id"), - "cable_id": props.get("cable_id"), - "cable_name": props.get("cable_name"), - "landing_point_id": props.get("landing_point_id"), - "landing_point_name": props.get("landing_point_name"), - "facility": props.get("facility"), - "status": props.get("status"), + "city_id": city_id, + "cable_id": cable_id, + "cable_name": cable_name, + "landing_point_id": landing_point_id, + "landing_point_name": landing_point_name, + "facility": facility, + "status": status, }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), } diff --git a/backend/app/services/collectors/base.py b/backend/app/services/collectors/base.py index 77a1948c..0288dd61 100644 --- a/backend/app/services/collectors/base.py +++ b/backend/app/services/collectors/base.py @@ -4,10 +4,12 @@ from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional from datetime import datetime import httpx -from sqlalchemy import text +from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession +from app.core.collected_data_fields import build_dynamic_metadata, get_record_field from app.core.config import settings +from app.core.countries import normalize_country class BaseCollector(ABC): @@ -39,6 +41,11 @@ class BaseCollector(ABC): records_processed / self._current_task.total_records ) * 100 + async def set_phase(self, phase: str): + if self._current_task and self._db_session: + self._current_task.phase = phase + await self._db_session.commit() + @abstractmethod async def fetch(self) -> List[Dict[str, Any]]: """Fetch raw data from source""" @@ -48,14 +55,87 @@ class BaseCollector(ABC): """Transform raw data to internal format (default: pass through)""" return raw_data + def _parse_reference_date(self, value: Any) -> Optional[datetime]: + if not value: + return None + if isinstance(value, datetime): + return value + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + return None + + def _build_comparable_payload(self, record: Any) -> Dict[str, Any]: + return { + "name": getattr(record, "name", None), + "title": getattr(record, "title", None), + "description": getattr(record, "description", None), + "country": get_record_field(record, "country"), + "city": get_record_field(record, "city"), + "latitude": get_record_field(record, "latitude"), + "longitude": get_record_field(record, "longitude"), + "value": get_record_field(record, "value"), + "unit": get_record_field(record, "unit"), + "metadata": getattr(record, "extra_data", None) or {}, + "reference_date": ( + getattr(record, "reference_date", None).isoformat() + if getattr(record, "reference_date", None) + else None + ), + } + + async def _create_snapshot( + self, + db: AsyncSession, + task_id: int, + data: List[Dict[str, Any]], + started_at: datetime, + ) -> int: + from app.models.data_snapshot import DataSnapshot + + reference_dates = [ + parsed + for parsed in (self._parse_reference_date(item.get("reference_date")) for item in data) + if parsed is not None + ] + reference_date = max(reference_dates) if reference_dates else None + + result = await db.execute( + select(DataSnapshot) + .where(DataSnapshot.source == self.name, DataSnapshot.is_current == True) + .order_by(DataSnapshot.completed_at.desc().nullslast(), DataSnapshot.id.desc()) + .limit(1) + ) + previous_snapshot = result.scalar_one_or_none() + + snapshot = DataSnapshot( + datasource_id=getattr(self, "_datasource_id", 1), + task_id=task_id, + source=self.name, + snapshot_key=f"{self.name}:{task_id}", + reference_date=reference_date, + started_at=started_at, + status="running", + is_current=True, + parent_snapshot_id=previous_snapshot.id if previous_snapshot else None, + summary={}, + ) + db.add(snapshot) + + if previous_snapshot: + previous_snapshot.is_current = False + + await db.commit() + return snapshot.id + async def run(self, db: AsyncSession) -> Dict[str, Any]: """Full pipeline: fetch -> transform -> save""" from app.services.collectors.registry import collector_registry from app.models.task import CollectionTask - from app.models.collected_data import CollectedData + from app.models.data_snapshot import DataSnapshot start_time = datetime.utcnow() datasource_id = getattr(self, "_datasource_id", 1) + snapshot_id: Optional[int] = None if not collector_registry.is_active(self.name): return {"status": "skipped", "reason": "Collector is disabled"} @@ -63,6 +143,7 @@ class BaseCollector(ABC): task = CollectionTask( datasource_id=datasource_id, status="running", + phase="queued", started_at=start_time, ) db.add(task) @@ -75,15 +156,20 @@ class BaseCollector(ABC): await self.resolve_url(db) try: + await self.set_phase("fetching") raw_data = await self.fetch() task.total_records = len(raw_data) await db.commit() + await self.set_phase("transforming") data = self.transform(raw_data) + snapshot_id = await self._create_snapshot(db, task_id, data, start_time) - records_count = await self._save_data(db, data) + await self.set_phase("saving") + records_count = await self._save_data(db, data, task_id=task_id, snapshot_id=snapshot_id) task.status = "success" + task.phase = "completed" task.records_processed = records_count task.progress = 100.0 task.completed_at = datetime.utcnow() @@ -97,8 +183,15 @@ class BaseCollector(ABC): } except Exception as e: task.status = "failed" + task.phase = "failed" task.error_message = str(e) task.completed_at = datetime.utcnow() + if snapshot_id is not None: + snapshot = await db.get(DataSnapshot, snapshot_id) + if snapshot: + snapshot.status = "failed" + snapshot.completed_at = datetime.utcnow() + snapshot.summary = {"error": str(e)} await db.commit() return { @@ -108,53 +201,163 @@ class BaseCollector(ABC): "execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(), } - async def _save_data(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int: + async def _save_data( + self, + db: AsyncSession, + data: List[Dict[str, Any]], + task_id: Optional[int] = None, + snapshot_id: Optional[int] = None, + ) -> int: """Save transformed data to database""" from app.models.collected_data import CollectedData + from app.models.data_snapshot import DataSnapshot if not data: + if snapshot_id is not None: + snapshot = await db.get(DataSnapshot, snapshot_id) + if snapshot: + snapshot.record_count = 0 + snapshot.summary = {"created": 0, "updated": 0, "unchanged": 0} + snapshot.status = "success" + snapshot.completed_at = datetime.utcnow() + await db.commit() return 0 collected_at = datetime.utcnow() records_added = 0 + created_count = 0 + updated_count = 0 + unchanged_count = 0 + seen_entity_keys: set[str] = set() + previous_current_keys: set[str] = set() + + previous_current_result = await db.execute( + select(CollectedData.entity_key).where( + CollectedData.source == self.name, + CollectedData.is_current == True, + ) + ) + previous_current_keys = {row[0] for row in previous_current_result.fetchall() if row[0]} for i, item in enumerate(data): print( f"DEBUG: Saving item {i}: name={item.get('name')}, metadata={item.get('metadata', 'NOT FOUND')}" ) + raw_metadata = item.get("metadata", {}) + extra_data = build_dynamic_metadata( + raw_metadata, + country=item.get("country"), + city=item.get("city"), + latitude=item.get("latitude"), + longitude=item.get("longitude"), + value=item.get("value"), + unit=item.get("unit"), + ) + normalized_country = normalize_country(item.get("country")) + if normalized_country is not None: + extra_data["country"] = normalized_country + + if item.get("country") and normalized_country != item.get("country"): + extra_data["raw_country"] = item.get("country") + if normalized_country is None: + extra_data["country_validation"] = "invalid" + + source_id = item.get("source_id") or item.get("id") + reference_date = ( + self._parse_reference_date(item.get("reference_date")) + ) + source_id_str = str(source_id) if source_id is not None else None + entity_key = f"{self.name}:{source_id_str}" if source_id_str else f"{self.name}:{i}" + previous_record = None + + if entity_key and entity_key not in seen_entity_keys: + result = await db.execute( + select(CollectedData) + .where( + CollectedData.source == self.name, + CollectedData.entity_key == entity_key, + CollectedData.is_current == True, + ) + .order_by(CollectedData.collected_at.desc().nullslast(), CollectedData.id.desc()) + ) + previous_records = result.scalars().all() + if previous_records: + previous_record = previous_records[0] + for old_record in previous_records: + old_record.is_current = False + record = CollectedData( + snapshot_id=snapshot_id, + task_id=task_id, source=self.name, - source_id=item.get("source_id") or item.get("id"), + source_id=source_id_str, + entity_key=entity_key, data_type=self.data_type, name=item.get("name"), title=item.get("title"), description=item.get("description"), - country=item.get("country"), - city=item.get("city"), - latitude=str(item.get("latitude", "")) - if item.get("latitude") is not None - else None, - longitude=str(item.get("longitude", "")) - if item.get("longitude") is not None - else None, - value=item.get("value"), - unit=item.get("unit"), - extra_data=item.get("metadata", {}), + extra_data=extra_data, collected_at=collected_at, - reference_date=datetime.fromisoformat( - item.get("reference_date").replace("Z", "+00:00") - ) - if item.get("reference_date") - else None, + reference_date=reference_date, is_valid=1, + is_current=True, + previous_record_id=previous_record.id if previous_record else None, + deleted_at=None, ) + + if previous_record is None: + record.change_type = "created" + record.change_summary = {} + created_count += 1 + else: + previous_payload = self._build_comparable_payload(previous_record) + current_payload = self._build_comparable_payload(record) + if current_payload == previous_payload: + record.change_type = "unchanged" + record.change_summary = {} + unchanged_count += 1 + else: + changed_fields = [ + key for key in current_payload.keys() if current_payload[key] != previous_payload.get(key) + ] + record.change_type = "updated" + record.change_summary = {"changed_fields": changed_fields} + updated_count += 1 + db.add(record) + seen_entity_keys.add(entity_key) records_added += 1 if i % 100 == 0: self.update_progress(i + 1) await db.commit() + if snapshot_id is not None: + deleted_keys = previous_current_keys - seen_entity_keys + await db.execute( + text( + """ + UPDATE collected_data + SET is_current = FALSE + WHERE source = :source + AND snapshot_id IS DISTINCT FROM :snapshot_id + AND COALESCE(is_current, TRUE) = TRUE + """ + ), + {"source": self.name, "snapshot_id": snapshot_id}, + ) + snapshot = await db.get(DataSnapshot, snapshot_id) + if snapshot: + snapshot.record_count = records_added + snapshot.status = "success" + snapshot.completed_at = datetime.utcnow() + snapshot.summary = { + "created": created_count, + "updated": updated_count, + "unchanged": unchanged_count, + "deleted": len(deleted_keys), + } + await db.commit() self.update_progress(len(data)) return records_added diff --git a/backend/app/services/collectors/peeringdb.py b/backend/app/services/collectors/peeringdb.py index bf59b1b0..3c2b0179 100644 --- a/backend/app/services/collectors/peeringdb.py +++ b/backend/app/services/collectors/peeringdb.py @@ -76,7 +76,7 @@ class PeeringDBIXPCollector(HTTPCollector): print(f"Warning: PeeringDB collection failed after {max_retries} retries: {last_error}") return {} - async def collect(self) -> List[Dict[str, Any]]: + async def fetch(self) -> List[Dict[str, Any]]: """Collect IXP data from PeeringDB with rate limit handling""" response_data = await self.fetch_with_retry() if not response_data: @@ -177,7 +177,7 @@ class PeeringDBNetworkCollector(HTTPCollector): print(f"Warning: PeeringDB collection failed after {max_retries} retries: {last_error}") return {} - async def collect(self) -> List[Dict[str, Any]]: + async def fetch(self) -> List[Dict[str, Any]]: """Collect Network data from PeeringDB with rate limit handling""" response_data = await self.fetch_with_retry() if not response_data: @@ -280,7 +280,7 @@ class PeeringDBFacilityCollector(HTTPCollector): print(f"Warning: PeeringDB collection failed after {max_retries} retries: {last_error}") return {} - async def collect(self) -> List[Dict[str, Any]]: + async def fetch(self) -> List[Dict[str, Any]]: """Collect Facility data from PeeringDB with rate limit handling""" response_data = await self.fetch_with_retry() if not response_data: diff --git a/backend/app/services/collectors/top500.py b/backend/app/services/collectors/top500.py index 78872374..9e21937d 100644 --- a/backend/app/services/collectors/top500.py +++ b/backend/app/services/collectors/top500.py @@ -4,9 +4,9 @@ Collects data from TOP500 supercomputer rankings. https://top500.org/lists/top500/ """ +import asyncio import re from typing import Dict, Any, List -from datetime import datetime from bs4 import BeautifulSoup import httpx @@ -21,14 +21,108 @@ class TOP500Collector(BaseCollector): data_type = "supercomputer" async def fetch(self) -> List[Dict[str, Any]]: - """Fetch TOP500 data from website (scraping)""" - # Get the latest list page + """Fetch TOP500 list data and enrich each row with detail-page metadata.""" url = "https://top500.org/lists/top500/list/2025/11/" - async with httpx.AsyncClient(timeout=60.0) as client: + async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: response = await client.get(url) response.raise_for_status() - return self.parse_response(response.text) + entries = self.parse_response(response.text) + + semaphore = asyncio.Semaphore(8) + + async def enrich(entry: Dict[str, Any]) -> Dict[str, Any]: + detail_url = entry.pop("_detail_url", "") + if not detail_url: + return entry + + async with semaphore: + try: + detail_response = await client.get(detail_url) + detail_response.raise_for_status() + entry["metadata"].update(self.parse_detail_response(detail_response.text)) + except Exception: + entry["metadata"]["detail_fetch_failed"] = True + return entry + + return await asyncio.gather(*(enrich(entry) for entry in entries)) + + def _extract_system_fields(self, system_cell) -> Dict[str, str]: + link = system_cell.find("a") + system_name = link.get_text(" ", strip=True) if link else system_cell.get_text(" ", strip=True) + detail_url = "" + if link and link.get("href"): + detail_url = f"https://top500.org{link.get('href')}" + + manufacturer = "" + if link and link.next_sibling: + manufacturer = str(link.next_sibling).strip(" ,\n\t") + + cell_text = system_cell.get_text("\n", strip=True) + lines = [line.strip(" ,") for line in cell_text.splitlines() if line.strip()] + + site = "" + country = "" + if lines: + system_name = lines[0] + if len(lines) >= 3: + site = lines[-2] + country = lines[-1] + elif len(lines) == 2: + country = lines[-1] + + if not manufacturer and len(lines) >= 2: + manufacturer = lines[1] + + return { + "name": system_name, + "manufacturer": manufacturer, + "site": site, + "country": country, + "detail_url": detail_url, + } + + def parse_detail_response(self, html: str) -> Dict[str, Any]: + soup = BeautifulSoup(html, "html.parser") + detail_table = soup.find("table", {"class": "table table-condensed"}) + if not detail_table: + return {} + + detail_map: Dict[str, Any] = {} + label_aliases = { + "Site": "site", + "Manufacturer": "manufacturer", + "Cores": "cores", + "Processor": "processor", + "Interconnect": "interconnect", + "Installation Year": "installation_year", + "Linpack Performance (Rmax)": "rmax", + "Theoretical Peak (Rpeak)": "rpeak", + "Nmax": "nmax", + "HPCG": "hpcg", + "Power": "power", + "Power Measurement Level": "power_measurement_level", + "Operating System": "operating_system", + "Compiler": "compiler", + "Math Library": "math_library", + "MPI": "mpi", + } + + for row in detail_table.find_all("tr"): + header = row.find("th") + value_cell = row.find("td") + if not header or not value_cell: + continue + + label = header.get_text(" ", strip=True).rstrip(":") + key = label_aliases.get(label) + if not key: + continue + + value = value_cell.get_text(" ", strip=True) + detail_map[key] = value + + return detail_map def parse_response(self, html: str) -> List[Dict[str, Any]]: """Parse TOP500 HTML response""" @@ -36,27 +130,26 @@ class TOP500Collector(BaseCollector): soup = BeautifulSoup(html, "html.parser") # Find the table with TOP500 data - table = soup.find("table", {"class": "top500-table"}) - if not table: - # Try alternative table selector - table = soup.find("table", {"id": "top500"}) + table = None + for candidate in soup.find_all("table"): + header_cells = [ + cell.get_text(" ", strip=True) for cell in candidate.select("thead th") + ] + normalized_headers = [header.lower() for header in header_cells] + if ( + "rank" in normalized_headers + and "system" in normalized_headers + and any("cores" in header for header in normalized_headers) + and any("rmax" in header for header in normalized_headers) + ): + table = candidate + break if not table: - # Try to find any table with rank data - tables = soup.find_all("table") - for t in tables: - if t.find(string=re.compile(r"Rank.*System.*Cores.*Rmax", re.I)): - table = t - break - - if not table: - # Fallback: try to extract data from any table - tables = soup.find_all("table") - if tables: - table = tables[0] + table = soup.find("table", {"class": "top500-table"}) or soup.find("table", {"id": "top500"}) if table: - rows = table.find_all("tr") + rows = table.select("tr") for row in rows[1:]: # Skip header row cells = row.find_all(["td", "th"]) if len(cells) >= 6: @@ -68,43 +161,26 @@ class TOP500Collector(BaseCollector): rank = int(rank_text) - # System name (may contain link) system_cell = cells[1] - system_name = system_cell.get_text(strip=True) - # Try to get full name from link title or data attribute - link = system_cell.find("a") - if link and link.get("title"): - system_name = link.get("title") + system_fields = self._extract_system_fields(system_cell) + system_name = system_fields["name"] + manufacturer = system_fields["manufacturer"] + site = system_fields["site"] + country = system_fields["country"] + detail_url = system_fields["detail_url"] - # Country - country_cell = cells[2] - country = country_cell.get_text(strip=True) - # Try to get country from data attribute or image alt - img = country_cell.find("img") - if img and img.get("alt"): - country = img.get("alt") - - # Extract location (city) city = "" - location_text = country_cell.get_text(strip=True) - if "(" in location_text and ")" in location_text: - city = location_text.split("(")[0].strip() + cores = cells[2].get_text(strip=True).replace(",", "") - # Cores - cores = cells[3].get_text(strip=True).replace(",", "") - - # Rmax - rmax_text = cells[4].get_text(strip=True) + rmax_text = cells[3].get_text(strip=True) rmax = self._parse_performance(rmax_text) - # Rpeak - rpeak_text = cells[5].get_text(strip=True) + rpeak_text = cells[4].get_text(strip=True) rpeak = self._parse_performance(rpeak_text) - # Power (optional) power = "" - if len(cells) >= 7: - power = cells[6].get_text(strip=True) + if len(cells) >= 6: + power = cells[5].get_text(strip=True).replace(",", "") entry = { "source_id": f"top500_{rank}", @@ -117,10 +193,14 @@ class TOP500Collector(BaseCollector): "unit": "PFlop/s", "metadata": { "rank": rank, - "r_peak": rpeak, - "power": power, "cores": cores, + "rmax": rmax_text, + "rpeak": rpeak_text, + "power": power, + "manufacturer": manufacturer, + "site": site, }, + "_detail_url": detail_url, "reference_date": "2025-11-01", } data.append(entry) @@ -184,10 +264,15 @@ class TOP500Collector(BaseCollector): "unit": "PFlop/s", "metadata": { "rank": 1, - "r_peak": 2746.38, - "power": 29581, - "cores": 11039616, + "cores": "11039616", + "rmax": "1742.00", + "rpeak": "2746.38", + "power": "29581", "manufacturer": "HPE", + "site": "DOE/NNSA/LLNL", + "processor": "AMD 4th Gen EPYC 24C 1.8GHz", + "interconnect": "Slingshot-11", + "installation_year": "2025", }, "reference_date": "2025-11-01", }, @@ -202,10 +287,12 @@ class TOP500Collector(BaseCollector): "unit": "PFlop/s", "metadata": { "rank": 2, - "r_peak": 2055.72, - "power": 24607, - "cores": 9066176, + "cores": "9066176", + "rmax": "1353.00", + "rpeak": "2055.72", + "power": "24607", "manufacturer": "HPE", + "site": "DOE/SC/Oak Ridge National Laboratory", }, "reference_date": "2025-11-01", }, @@ -220,9 +307,10 @@ class TOP500Collector(BaseCollector): "unit": "PFlop/s", "metadata": { "rank": 3, - "r_peak": 1980.01, - "power": 38698, - "cores": 9264128, + "cores": "9264128", + "rmax": "1012.00", + "rpeak": "1980.01", + "power": "38698", "manufacturer": "Intel", }, "reference_date": "2025-11-01", diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 3932ca16..ce43ea66 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -2,8 +2,8 @@ import asyncio import logging -from datetime import datetime -from typing import Any, Dict +from datetime import datetime, timedelta +from typing import Any, Dict, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger @@ -11,6 +11,7 @@ from sqlalchemy import select from app.db.session import async_session_factory from app.models.datasource import DataSource +from app.models.task import CollectionTask from app.services.collectors.registry import collector_registry logger = logging.getLogger(__name__) @@ -89,6 +90,35 @@ async def run_collector_task(collector_name: str): logger.exception("Collector %s failed: %s", collector_name, exc) +async def cleanup_stale_running_tasks(max_age_hours: int = 2) -> int: + """Mark stale running tasks as failed after restarts or collector hangs.""" + cutoff = datetime.utcnow() - timedelta(hours=max_age_hours) + + async with async_session_factory() as db: + result = await db.execute( + select(CollectionTask).where( + CollectionTask.status == "running", + CollectionTask.started_at.is_not(None), + CollectionTask.started_at < cutoff, + ) + ) + stale_tasks = result.scalars().all() + + for task in stale_tasks: + task.status = "failed" + task.phase = "failed" + task.completed_at = datetime.utcnow() + existing_error = (task.error_message or "").strip() + cleanup_error = "Marked failed automatically after stale running task cleanup" + task.error_message = f"{existing_error}\n{cleanup_error}".strip() if existing_error else cleanup_error + + if stale_tasks: + await db.commit() + logger.warning("Cleaned up %s stale running collection task(s)", len(stale_tasks)) + + return len(stale_tasks) + + def start_scheduler() -> None: """Start the scheduler.""" if not scheduler.running: @@ -144,6 +174,19 @@ def get_scheduler_jobs() -> list[Dict[str, Any]]: return jobs +async def get_latest_task_id_for_datasource(datasource_id: int) -> Optional[int]: + from app.models.task import CollectionTask + + async with async_session_factory() as db: + result = await db.execute( + select(CollectionTask.id) + .where(CollectionTask.datasource_id == datasource_id) + .order_by(CollectionTask.created_at.desc(), CollectionTask.id.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + + def run_collector_now(collector_name: str) -> bool: """Run a collector immediately (not scheduled).""" collector = collector_registry.get(collector_name) diff --git a/docs/collected-data-column-removal-plan.md b/docs/collected-data-column-removal-plan.md new file mode 100644 index 00000000..efb18f28 --- /dev/null +++ b/docs/collected-data-column-removal-plan.md @@ -0,0 +1,207 @@ +# collected_data 强耦合列拆除计划 + +## 背景 + +当前 `collected_data` 同时承担了两类职责: + +1. 通用采集事实表 +2. 少数数据源的宽表字段承载 + +典型强耦合列包括: + +- `country` +- `city` +- `latitude` +- `longitude` +- `value` +- `unit` + +以及 API 层临时平铺出来的: + +- `cores` +- `rmax` +- `rpeak` +- `power` + +这些字段并不适合作为统一事实表的长期 schema。 +推荐方向是: + +- 表内保留通用稳定字段 +- 业务差异字段全部归入 `metadata` +- API 和前端动态读取 `metadata` + +## 拆除目标 + +最终希望 `collected_data` 只保留: + +- `id` +- `snapshot_id` +- `task_id` +- `source` +- `source_id` +- `entity_key` +- `data_type` +- `name` +- `title` +- `description` +- `metadata` +- `collected_at` +- `reference_date` +- `is_valid` +- `is_current` +- `previous_record_id` +- `change_type` +- `change_summary` +- `deleted_at` + +## 计划阶段 + +### Phase 1:读取层去依赖 + +目标: + +- API / 可视化 / 前端不再优先依赖宽列表字段 +- 所有动态字段优先从 `metadata` 取 + +当前已完成: + +- 新写入数据时,将 `country/city/latitude/longitude/value/unit` 自动镜像到 `metadata` +- `/api/v1/collected` 优先从 `metadata` 取动态字段 +- `visualization` 接口优先从 `metadata` 取动态字段 +- 国家筛选已改成只走 `metadata->>'country'` +- `CollectedData.to_dict()` 已切到 metadata-first +- 变更比较逻辑已切到 metadata-first +- 已新增历史回填脚本: + [scripts/backfill_collected_data_metadata.py](/home/ray/dev/linkong/planet/scripts/backfill_collected_data_metadata.py) +- 已新增删列脚本: + [scripts/drop_collected_data_legacy_columns.py](/home/ray/dev/linkong/planet/scripts/drop_collected_data_legacy_columns.py) + +涉及文件: + +- [backend/app/core/collected_data_fields.py](/home/ray/dev/linkong/planet/backend/app/core/collected_data_fields.py) +- [backend/app/services/collectors/base.py](/home/ray/dev/linkong/planet/backend/app/services/collectors/base.py) +- [backend/app/api/v1/collected_data.py](/home/ray/dev/linkong/planet/backend/app/api/v1/collected_data.py) +- [backend/app/api/v1/visualization.py](/home/ray/dev/linkong/planet/backend/app/api/v1/visualization.py) + +### Phase 2:写入层去依赖 + +目标: + +- 采集器内部不再把这些字段当作数据库一级列来理解 +- 统一只写: + - 通用主字段 + - `metadata` + +建议动作: + +1. Collector 内部仍可使用 `country/city/value` 这种临时字段作为采集过程变量 +2. 进入 `BaseCollector._save_data()` 后统一归档到 `metadata` +3. `CollectedData` 模型中的强耦合列已从 ORM 移除,写入统一归档到 `metadata` + +### Phase 3:数据库删列 + +目标: + +- 从 `collected_data` 真正移除以下列: + - `country` + - `city` + - `latitude` + - `longitude` + - `value` + - `unit` + +注意: + +- `cores / rmax / rpeak / power` 当前本来就在 `metadata` 里,不是表列 +- 这四个主要是 API 平铺字段,不需要数据库删列 + +## 当前阻塞点 + +在正式删列前,还需要确认这些地方已经完全不再直接依赖数据库列: + +### 1. `CollectedData.to_dict()` + +文件: + +- [backend/app/models/collected_data.py](/home/ray/dev/linkong/planet/backend/app/models/collected_data.py) + +状态: + +- 已完成 + +### 2. 差异计算逻辑 + +文件: + +- [backend/app/services/collectors/base.py](/home/ray/dev/linkong/planet/backend/app/services/collectors/base.py) + +状态: + +- 已完成 +- 当前已改成比较归一化后的 metadata-first payload + +### 3. 历史数据回填 + +问题: + +- 老数据可能只有列值,没有对应 `metadata` + +当前方案: + +- 在删列前执行一次回填脚本: + - [scripts/backfill_collected_data_metadata.py](/home/ray/dev/linkong/planet/scripts/backfill_collected_data_metadata.py) + +### 4. 导出格式兼容 + +文件: + +- [backend/app/api/v1/collected_data.py](/home/ray/dev/linkong/planet/backend/app/api/v1/collected_data.py) + +现状: + +- CSV/JSON 导出已基本切成 metadata-first + +建议: + +- 删列前再回归检查一次导出字段是否一致 + +## 推荐执行顺序 + +1. 保持新数据写入时 `metadata` 完整 +2. 把模型和 diff 逻辑完全切成 metadata-first +3. 写一条历史回填脚本 +4. 回填后观察一轮 +5. 正式执行删列迁移 + +## 推荐迁移 SQL + +仅在确认全部读取链路已去依赖后执行: + +```sql +ALTER TABLE collected_data +DROP COLUMN IF EXISTS country, +DROP COLUMN IF EXISTS city, +DROP COLUMN IF EXISTS latitude, +DROP COLUMN IF EXISTS longitude, +DROP COLUMN IF EXISTS value, +DROP COLUMN IF EXISTS unit; +``` + +## 风险提示 + +1. 地图类接口对经纬度最敏感 + 必须确保所有地图需要的记录,其 `metadata.latitude/longitude` 已回填完整。 + +2. 历史老数据如果没有回填,删列后会直接丢失这些信息。 + +3. 某些 collector 可能仍隐式依赖这些宽字段做差异比较,删列前必须做一次全量回归。 + +## 当前判断 + +当前项目已经完成“代码去依赖 + 历史回填 + readiness 检查”。 +下一步执行顺序建议固定为: + +1. 先部署当前代码版本并重启后端 +2. 再做一轮功能回归 +3. 最后执行: + `uv run python scripts/drop_collected_data_legacy_columns.py` diff --git a/docs/collected-data-history-plan.md b/docs/collected-data-history-plan.md new file mode 100644 index 00000000..574cebc5 --- /dev/null +++ b/docs/collected-data-history-plan.md @@ -0,0 +1,402 @@ +# 采集数据历史快照化改造方案 + +## 背景 + +当前系统的 `collected_data` 更接近“当前结果表”: + +- 同一个 `source + source_id` 会被更新覆盖 +- 前端列表页默认读取这张表 +- `collection_tasks` 只记录任务执行状态,不直接承载数据版本语义 + +这套方式适合管理后台,但不利于后续做态势感知、时间回放、趋势分析和版本对比。 +如果后面需要回答下面这类问题,当前模型会比较吃力: + +- 某条实体在过去 7 天如何变化 +- 某次采集相比上次新增了什么、删除了什么、值变了什么 +- 某个时刻地图上“当时的世界状态”是什么 +- 告警是在第几次采集后触发的 + +因此建议把采集数据改造成“历史快照 + 当前视图”模型。 + +## 目标 + +1. 每次触发采集都保留一份独立快照,历史可追溯。 +2. 管理后台默认仍然只看“当前最新状态”,不增加使用复杂度。 +3. 后续支持: + - 时间线回放 + - 两次采集差异对比 + - 趋势分析 + - 按快照回溯告警和地图状态 +4. 尽量兼容现有接口,降低改造成本。 + +## 结论 + +不建议继续用以下两种单一模式: + +- 直接覆盖旧数据 + 问题:没有历史,无法回溯。 + +- 软删除旧数据再全量新增 + 问题:语义不清,历史和“当前无效”混在一起,后续统计复杂。 + +推荐方案: + +- 保留历史事实表 +- 维护当前视图 +- 每次采集对应一个明确的快照批次 + +## 推荐数据模型 + +### 方案概览 + +建议拆成三层: + +1. `collection_tasks` + 继续作为采集任务表,表示“这次采集任务”。 + +2. `data_snapshots` + 新增快照表,表示“某个数据源在某次任务中产出的一个快照批次”。 + +3. `collected_data` + 从“当前结果表”升级为“历史事实表”,每一行归属于一个快照。 + +同时再提供一个“当前视图”: + +- SQL View / 物化视图 / API 查询层封装均可 +- 语义是“每个 `source + source_id` 的最新有效记录” + +### 新增表:`data_snapshots` + +建议字段: + +| 字段 | 类型 | 含义 | +|---|---|---| +| `id` | bigint PK | 快照主键 | +| `datasource_id` | int | 对应数据源 | +| `task_id` | int | 对应采集任务 | +| `source` | varchar(100) | 数据源名,如 `top500` | +| `snapshot_key` | varchar(100) | 可选,业务快照标识 | +| `reference_date` | timestamptz nullable | 这批数据的参考时间 | +| `started_at` | timestamptz | 快照开始时间 | +| `completed_at` | timestamptz | 快照完成时间 | +| `record_count` | int | 快照总记录数 | +| `status` | varchar(20) | `running/success/failed/partial` | +| `is_current` | bool | 当前是否是该数据源最新快照 | +| `parent_snapshot_id` | bigint nullable | 上一版快照,可用于 diff | +| `summary` | jsonb | 本次快照统计摘要 | + +说明: + +- `collection_tasks` 偏“执行过程” +- `data_snapshots` 偏“数据版本” +- 一个任务通常对应一个快照,但保留分层更清晰 + +### 升级表:`collected_data` + +建议新增字段: + +| 字段 | 类型 | 含义 | +|---|---|---| +| `snapshot_id` | bigint not null | 归属快照 | +| `task_id` | int nullable | 归属任务,便于追查 | +| `entity_key` | varchar(255) | 实体稳定键,通常可由 `source + source_id` 派生 | +| `is_current` | bool | 当前是否为该实体最新记录 | +| `previous_record_id` | bigint nullable | 上一个版本的记录 | +| `change_type` | varchar(20) | `created/updated/unchanged/deleted` | +| `change_summary` | jsonb | 字段变化摘要 | +| `deleted_at` | timestamptz nullable | 对应“本次快照中消失”的实体 | + +保留现有字段: + +- `source` +- `source_id` +- `data_type` +- `name` +- `title` +- `description` +- `country` +- `city` +- `latitude` +- `longitude` +- `value` +- `unit` +- `metadata` +- `collected_at` +- `reference_date` +- `is_valid` + +### 当前视图 + +建议新增一个只读视图: + +`current_collected_data` + +语义: + +- 对每个 `source + source_id` 只保留最新一条 `is_current = true` 且 `deleted_at is null` 的记录 + +这样: + +- 管理后台继续像现在一样查“当前数据” +- 历史分析查 `collected_data` + +## 写入策略 + +### 触发按钮语义 + +“触发”不再理解为“覆盖旧表”,而是: + +- 启动一次新的采集任务 +- 生成一个新的快照 +- 将本次结果写入历史事实表 +- 再更新当前视图标记 + +### 写入流程 + +1. 创建 `collection_tasks` 记录,状态 `running` +2. 创建 `data_snapshots` 记录,状态 `running` +3. 采集器拉取原始数据并标准化 +4. 为每条记录生成 `entity_key` + - 推荐:`{source}:{source_id}` +5. 将本次记录批量写入 `collected_data` +6. 与上一个快照做比对,计算: + - 新增 + - 更新 + - 未变 + - 删除 +7. 更新本批记录的: + - `change_type` + - `previous_record_id` + - `is_current` +8. 将上一批同实体记录的 `is_current` 置为 `false` +9. 将本次快照未出现但上一版存在的实体标记为 `deleted` +10. 更新 `data_snapshots.status = success` +11. 更新 `collection_tasks.status = success` + +### 删除语义 + +这里不建议真的删记录。 +建议采用“逻辑消失”模型: + +- 历史行永远保留 +- 如果某实体在新快照里消失: + - 上一条历史记录补一条“删除状态记录”或标记 `change_type = deleted` + - 同时该实体不再出现在当前视图 + +这样最适合态势感知。 + +## API 改造建议 + +### 保持现有接口默认行为 + +现有接口: + +- `GET /api/v1/collected` +- `GET /api/v1/collected/{id}` +- `GET /api/v1/collected/summary` + +建议默认仍返回“当前视图”,避免前端全面重写。 + +### 新增历史查询能力 + +建议新增参数或新接口: + +#### 1. 当前/历史切换 + +`GET /api/v1/collected?mode=current|history` + +- `current`:默认,查当前视图 +- `history`:查历史事实表 + +#### 2. 按快照查询 + +`GET /api/v1/collected?snapshot_id=123` + +#### 3. 快照列表 + +`GET /api/v1/snapshots` + +支持筛选: + +- `datasource_id` +- `source` +- `status` +- `date_from/date_to` + +#### 4. 快照详情 + +`GET /api/v1/snapshots/{id}` + +返回: + +- 快照基础信息 +- 统计摘要 +- 与上一版的 diff 摘要 + +#### 5. 快照 diff + +`GET /api/v1/snapshots/{id}/diff?base_snapshot_id=122` + +返回: + +- `created` +- `updated` +- `deleted` +- `unchanged` + +## 前端改造建议 + +### 1. 数据列表页 + +默认仍看当前数据,不改用户使用习惯。 + +建议新增: + +- “视图模式” + - 当前数据 + - 历史数据 +- “快照时间”筛选 +- “只看变化项”筛选 + +### 2. 数据详情页 + +详情页建议展示: + +- 当前记录基础信息 +- 元数据动态字段 +- 所属快照 +- 上一版本对比入口 +- 历史版本时间线 + +### 3. 数据源管理页 + +“触发”按钮文案建议改成更准确的: + +- `立即采集` + +并在详情里补: + +- 最近一次快照时间 +- 最近一次快照记录数 +- 最近一次变化数 + +## 迁移方案 + +### Phase 1:兼容式落地 + +目标:先保留当前页面可用。 + +改动: + +1. 新增 `data_snapshots` +2. 给 `collected_data` 增加: + - `snapshot_id` + - `task_id` + - `entity_key` + - `is_current` + - `previous_record_id` + - `change_type` + - `change_summary` + - `deleted_at` +3. 现有数据全部补成一个“初始化快照” +4. 现有 `/collected` 默认改查当前视图 + +优点: + +- 前端几乎无感 +- 风险最小 + +### Phase 2:启用差异计算 + +目标:采集后可知道本次改了什么。 + +改动: + +1. 写入时做新旧快照比对 +2. 写 `change_type` +3. 生成快照摘要 + +### Phase 3:前端态势感知能力 + +目标:支持历史回放和趋势分析。 + +改动: + +1. 快照时间线 +2. 版本 diff 页面 +3. 地图时间回放 +4. 告警和快照关联 + +## 唯一性与索引建议 + +### 建议保留的业务唯一性 + +在“同一个快照内部”,建议唯一: + +- `(snapshot_id, source, source_id)` + +不要在整张历史表上强加: + +- `(source, source_id)` 唯一 + +因为历史表本来就应该允许同一实体跨快照存在多条版本。 + +### 建议索引 + +- `idx_collected_data_snapshot_id` +- `idx_collected_data_source_source_id` +- `idx_collected_data_entity_key` +- `idx_collected_data_is_current` +- `idx_collected_data_reference_date` +- `idx_snapshots_source_completed_at` + +## 风险点 + +1. 存储量会明显增加 + - 需要评估保留周期 + - 可以考虑冷热分层 + +2. 写入复杂度上升 + - 需要批量 upsert / diff 逻辑 + +3. 当前接口语义会从“表”变成“视图” + - 文档必须同步 + +4. 某些采集器缺稳定 `source_id` + - 需要补齐实体稳定键策略 + +## 对当前项目的具体建议 + +结合当前代码,推荐这样落地: + +### 短期 + +1. 先设计并落表: + - `data_snapshots` + - `collected_data` 新字段 +2. 采集完成后每次新增快照 +3. `/api/v1/collected` 默认查 `is_current = true` + +### 中期 + +1. 在 `BaseCollector._save_data()` 中改成: + - 生成快照 + - 批量写历史 + - 标记当前 +2. 将 `CollectionTask.id` 关联到 `snapshot.task_id` + +### 长期 + +1. 地图接口支持按 `snapshot_id` 查询 +2. 仪表盘支持“最近一次快照变化量” +3. 告警支持绑定到快照版本 + +## 最终建议 + +最终建议采用: + +- 历史事实表:保存每次采集结果 +- 当前视图:服务管理后台默认查询 +- 快照表:承载版本批次和 diff 语义 + +这样既能保留历史,又不会把当前页面全部推翻重做,是最适合后续做态势感知的一条路径。 diff --git a/docs/system-settings-plan.md b/docs/system-settings-plan.md index 8d7d8952..8933cd34 100644 --- a/docs/system-settings-plan.md +++ b/docs/system-settings-plan.md @@ -44,4 +44,5 @@ - 设置项修改后重启服务仍然存在 - 配置页可以查看并修改所有内置采集器的启停与采集频率 - 调整采集频率后,调度器任务随之更新 -- `/settings` 页面可从主导航进入并正常工作 +- `/settings` 页面可从主导航进入并正常工作 + diff --git a/frontend/src/index.css b/frontend/src/index.css index de1a1f0c..ae02ec64 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -231,6 +231,10 @@ body { overflow: hidden; } +.data-source-tabs .ant-tabs-tabpane-hidden { + display: none !important; +} + .data-source-custom-tab { gap: 12px; } @@ -340,6 +344,42 @@ body { min-width: 100%; } +.table-scroll-region .ant-table-thead > tr > th, +.table-scroll-region .ant-table-tbody > tr > td { + padding: 10px 12px !important; +} + +.table-scroll-region .ant-table-body, +.table-scroll-region .ant-table-content { + scrollbar-width: thin; + scrollbar-color: rgba(148, 163, 184, 0.88) transparent; +} + +.table-scroll-region .ant-table-body::-webkit-scrollbar, +.table-scroll-region .ant-table-content::-webkit-scrollbar { + width: 10px; + height: 10px; +} + +.table-scroll-region .ant-table-body::-webkit-scrollbar-thumb, +.table-scroll-region .ant-table-content::-webkit-scrollbar-thumb { + background: rgba(148, 163, 184, 0.82); + border-radius: 999px; + border: 2px solid transparent; + background-clip: padding-box; +} + +.table-scroll-region .ant-table-body::-webkit-scrollbar-thumb:hover, +.table-scroll-region .ant-table-content::-webkit-scrollbar-thumb:hover { + background: rgba(100, 116, 139, 0.9); + background-clip: padding-box; +} + +.table-scroll-region .ant-table-body::-webkit-scrollbar-track, +.table-scroll-region .ant-table-content::-webkit-scrollbar-track { + background: transparent; +} + .settings-shell, .settings-tabs-shell, .settings-tabs, @@ -377,7 +417,7 @@ body { display: none !important; } -.settings-tab-panel { +.settings-pane { flex: 1 1 auto; min-width: 0; min-height: 0; @@ -427,9 +467,22 @@ body { background: transparent; } -.settings-table-scroll-region { +.settings-pane .data-source-table-region .ant-table-container { + display: flex; + flex-direction: column; + height: 100%; + min-height: 0; +} + +.settings-pane .data-source-table-region .ant-table-header { + flex: 0 0 auto; +} + +.settings-pane .data-source-table-region .ant-table-body { flex: 1 1 auto; - overflow: hidden; + min-height: 0; + height: 0 !important; + max-height: none !important; } @@ -490,6 +543,10 @@ body { overflow: auto; } +.data-list-summary-card-inner { + min-height: 100%; +} + .data-list-right-column { min-width: 0; min-height: 0; @@ -499,7 +556,9 @@ body { } .data-list-summary-treemap { - min-height: 100%; + --data-list-treemap-tile-padding: 12px; + --data-list-treemap-label-size: 12px; + --data-list-treemap-value-size: 16px; display: grid; grid-template-columns: repeat(4, minmax(0, 1fr)); grid-auto-rows: minmax(56px, 1fr); @@ -512,9 +571,9 @@ body { min-height: 0; display: flex; flex-direction: column; - justify-content: space-between; - gap: 8px; - padding: 12px; + justify-content: flex-start; + gap: 6px; + padding: var(--data-list-treemap-tile-padding); border-radius: 16px; border: 1px solid rgba(255, 255, 255, 0.55); color: #0f172a; @@ -552,29 +611,36 @@ body { .data-list-treemap-head { display: flex; align-items: center; - gap: 8px; + gap: 6px; min-width: 0; + flex: 0 0 auto; } .data-list-treemap-label { min-width: 0; - font-size: clamp(11px, 0.75vw, 13px); + font-size: var(--data-list-treemap-label-size); line-height: 1.2; color: rgba(15, 23, 42, 0.78); + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; } .data-list-treemap-body { display: flex; flex-direction: column; - gap: 4px; + gap: 2px; + margin-top: auto; + min-height: 0; + flex: 0 0 auto; } .data-list-summary-tile-icon { display: inline-flex; align-items: center; justify-content: center; - width: 24px; - height: 24px; + width: 22px; + height: 22px; border-radius: 8px; background: rgba(255, 255, 255, 0.55); color: #0f172a; @@ -582,9 +648,12 @@ body { } .data-list-summary-tile-value { - font-size: clamp(12px, 1vw, 16px); + font-size: var(--data-list-treemap-value-size); line-height: 1.1; color: #0f172a; + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; } .data-list-treemap-meta { @@ -611,7 +680,7 @@ body { display: flex; flex-wrap: nowrap; gap: 10px; - align-items: center; + align-items: flex-start; } .data-list-filter-grid--balanced > * { @@ -687,6 +756,46 @@ body { margin: 12px 0 0; } +.data-list-name-link { + max-width: 100%; + display: inline-flex; + align-items: center; + justify-content: flex-start; + padding-inline: 0 !important; +} + +.data-list-name-marquee { + display: block; + max-width: 100%; + overflow: hidden; + white-space: nowrap; +} + +.data-list-name-marquee--overflow { + width: 100%; +} + +.data-list-name-marquee__text { + display: inline-block; + max-width: 100%; + white-space: nowrap; + transform: translateX(0); + will-change: transform; +} + +.data-list-name-link:hover .data-list-name-marquee--overflow .data-list-name-marquee__text { + animation: data-list-name-marquee 8s linear infinite; +} + +@keyframes data-list-name-marquee { + from { + transform: translateX(0); + } + to { + transform: translateX(-100%); + } +} + .data-list-resize-handle { position: relative; display: flex; @@ -807,3 +916,172 @@ body { } } + +.data-list-detail-modal { + display: flex; + flex-direction: column; + gap: 16px; +} + +.data-list-detail-section { + display: flex; + flex-direction: column; + gap: 10px; + min-width: 0; +} + +.data-list-detail-section__title { + font-size: 14px; +} + +.data-list-detail-hero { + padding: 14px 16px; + border-radius: 12px; + background: #f7f8fa; + border: 1px solid #eef1f5; +} + +.data-list-detail-hero__label { + display: block; + margin-bottom: 6px; + color: #6b7280; + font-size: 12px; +} + +.data-list-detail-hero__title.ant-typography { + margin: 0; + overflow-wrap: anywhere; +} + +.data-list-detail-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(240px, 1fr)); + gap: 12px; +} + +.data-list-detail-cell { + min-width: 0; + padding: 12px 14px; + border-radius: 12px; + border: 1px solid #eef1f5; + background: #fff; +} + +.data-list-detail-cell--block { + grid-column: 1 / -1; +} + +.data-list-detail-cell__label { + display: block; + margin-bottom: 8px; + color: #6b7280; + font-size: 12px; +} + +.data-list-detail-cell__value { + color: #111827; + line-height: 1.6; + white-space: pre-wrap; + overflow-wrap: anywhere; + word-break: break-word; +} + +.data-list-detail-code { + margin: 0; + padding: 12px; + max-height: 240px; + overflow: auto; + border-radius: 10px; + background: #111827; + color: #e5eef9; + font-size: 12px; + line-height: 1.6; + white-space: pre-wrap; + overflow-wrap: anywhere; + word-break: break-word; +} + +.data-list-detail-code--raw { + max-height: 320px; +} + +.data-list-tag-cell { + min-width: 140px; +} + +.data-list-tag-cell .ant-tag { + display: inline-block; + max-width: 100%; + white-space: normal; + overflow-wrap: anywhere; + word-break: break-word; + line-height: 1.4; +} + +.data-list-filter-select { + max-width: 220px; +} + +.data-list-filter-select .ant-select-selector { + height: auto !important; + min-height: 32px; + max-height: 72px; + align-items: flex-start !important; + overflow-y: auto; + overflow-x: hidden; + scrollbar-width: thin; +} + +.data-list-filter-select .ant-select-selection-overflow { + flex-wrap: wrap !important; +} + +.data-list-filter-select .ant-select-selection-overflow-item { + max-width: 100%; +} + +.data-list-filter-select .ant-select-selection-item { + max-width: 100%; +} + +.dashboard-page { + display: flex; + flex-direction: column; + gap: 16px; +} + +.dashboard-page__header { + display: flex; + justify-content: space-between; + align-items: flex-start; + gap: 12px; + flex-wrap: wrap; +} + +.dashboard-page__actions { + align-items: center; +} + +.dashboard-status-tag { + margin-inline-end: 0 !important; + padding-inline: 10px; + border-radius: 999px; + line-height: 24px; +} + +.dashboard-refresh-button.ant-btn { + height: 26px; + padding-inline: 12px; + border-radius: 999px; + border-color: #d9d9d9; + background: #ffffff; + color: rgba(0, 0, 0, 0.88); + box-shadow: none; +} + +.dashboard-refresh-button.ant-btn:hover, +.dashboard-refresh-button.ant-btn:focus { + border-color: #bfbfbf; + background: #ffffff; + color: rgba(0, 0, 0, 0.88); +} diff --git a/frontend/src/main.tsx b/frontend/src/main.tsx index 55e0e867..0b9f3b4f 100644 --- a/frontend/src/main.tsx +++ b/frontend/src/main.tsx @@ -14,7 +14,7 @@ ReactDOM.createRoot(document.getElementById('root')!).render( }, }} > - + diff --git a/frontend/src/pages/Dashboard/Dashboard.tsx b/frontend/src/pages/Dashboard/Dashboard.tsx index c2db29ca..9202ddd4 100644 --- a/frontend/src/pages/Dashboard/Dashboard.tsx +++ b/frontend/src/pages/Dashboard/Dashboard.tsx @@ -122,19 +122,19 @@ function Dashboard() { return ( -
-
+
+
仪表盘 系统总览与实时态势
- + {wsConnected ? ( - } color="success">实时连接 + } color="success">实时连接 ) : ( - } color="default">离线 + } color="default">离线 )} - +
@@ -188,7 +188,7 @@ function Dashboard() { {stats?.last_updated && (
最后更新: {new Date(stats.last_updated).toLocaleString('zh-CN')} - {wsConnected && 实时同步中} + {wsConnected && 实时同步中}
)}
diff --git a/frontend/src/pages/DataList/DataList.tsx b/frontend/src/pages/DataList/DataList.tsx index f2f37f9e..4f2f548c 100644 --- a/frontend/src/pages/DataList/DataList.tsx +++ b/frontend/src/pages/DataList/DataList.tsx @@ -1,9 +1,10 @@ -import { useEffect, useMemo, useRef, useState } from 'react' +import { useEffect, useLayoutEffect, useMemo, useRef, useState, type CSSProperties } from 'react' import { Table, Tag, Space, Card, Select, Input, Button, - Modal, Descriptions, Spin, Empty, Tooltip, Typography, Grid + Modal, Spin, Empty, Tooltip, Typography, Grid } from 'antd' import type { ColumnsType } from 'antd/es/table' +import type { CustomTagProps } from 'rc-select/lib/BaseSelect' import { DatabaseOutlined, GlobalOutlined, CloudServerOutlined, AppstoreOutlined, EyeOutlined, SearchOutlined, FilterOutlined, ReloadOutlined @@ -28,6 +29,10 @@ interface CollectedData { longitude: string | null value: string | null unit: string | null + cores: string | null + rmax: string | null + rpeak: string | null + power: string | null metadata: Record | null collected_at: string reference_date: string | null @@ -40,6 +45,183 @@ interface Summary { source_totals: Array<{ source: string; count: number }> } +const DETAIL_FIELD_LABELS: Record = { + id: 'ID', + source: '数据源', + source_id: '原始ID', + data_type: '数据类型', + name: '名称', + title: '标题', + description: '描述', + country: '国家', + city: '城市', + latitude: '纬度', + longitude: '经度', + value: '数值', + unit: '单位', + collected_at: '采集时间', + reference_date: '参考日期', + is_valid: '有效状态', + rank: '排名', + cores: '核心数量', + rmax: '实际最大算力', + rpeak: '理论算力', + power: '功耗', + manufacturer: '厂商', + site: '站点', + processor: '处理器', + interconnect: '互连', + installation_year: '安装年份', + nmax: 'Nmax', + hpcg: 'HPCG', + power_measurement_level: '功耗测量等级', + operating_system: '操作系统', + compiler: '编译器', + math_library: '数学库', + mpi: 'MPI', + raw_country: '原始国家值', + country_validation: '国家校验', +} + +const DETAIL_BASE_FIELDS = [ + 'source', + 'data_type', + 'source_id', + 'country', + 'city', + 'collected_at', + 'reference_date', +] + +function formatFieldLabel(key: string) { + if (DETAIL_FIELD_LABELS[key]) { + return DETAIL_FIELD_LABELS[key] + } + + return key + .split('_') + .filter(Boolean) + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(' ') +} + +function formatDetailValue(key: string, value: unknown) { + if (value === null || value === undefined || value === '') { + return '-' + } + + if (key === 'collected_at' || key === 'reference_date') { + const date = new Date(String(value)) + return Number.isNaN(date.getTime()) + ? String(value) + : key === 'reference_date' + ? date.toLocaleDateString('zh-CN') + : date.toLocaleString('zh-CN') + } + + if (typeof value === 'boolean') { + return value ? '是' : '否' + } + + if (typeof value === 'object') { + return JSON.stringify(value, null, 2) + } + + return String(value) +} + +function NameMarquee({ text }: { text: string }) { + const containerRef = useRef(null) + const textRef = useRef(null) + const [overflowing, setOverflowing] = useState(false) + + useLayoutEffect(() => { + const updateOverflow = () => { + const container = containerRef.current + const content = textRef.current + if (!container || !content) return + setOverflowing(content.scrollWidth > container.clientWidth + 1) + } + + updateOverflow() + + if (typeof ResizeObserver === 'undefined') { + return undefined + } + + const observer = new ResizeObserver(updateOverflow) + if (containerRef.current) observer.observe(containerRef.current) + if (textRef.current) observer.observe(textRef.current) + + return () => observer.disconnect() + }, [text]) + + return ( + + + {text} + + + ) +} + +function estimateTreemapRows( + items: Array<{ colSpan: number; rowSpan: number }>, + columns: number +): number { + const occupancy: boolean[][] = [] + + const ensureRow = (rowIndex: number) => { + while (occupancy.length <= rowIndex) { + occupancy.push(Array(columns).fill(false)) + } + } + + for (const item of items) { + let placed = false + let rowIndex = 0 + + while (!placed) { + ensureRow(rowIndex) + + for (let columnIndex = 0; columnIndex <= columns - item.colSpan; columnIndex += 1) { + let canPlace = true + + for (let rowOffset = 0; rowOffset < item.rowSpan; rowOffset += 1) { + ensureRow(rowIndex + rowOffset) + + for (let columnOffset = 0; columnOffset < item.colSpan; columnOffset += 1) { + if (occupancy[rowIndex + rowOffset][columnIndex + columnOffset]) { + canPlace = false + break + } + } + + if (!canPlace) break + } + + if (!canPlace) continue + + for (let rowOffset = 0; rowOffset < item.rowSpan; rowOffset += 1) { + for (let columnOffset = 0; columnOffset < item.colSpan; columnOffset += 1) { + occupancy[rowIndex + rowOffset][columnIndex + columnOffset] = true + } + } + + placed = true + break + } + + rowIndex += 1 + } + } + + return Math.max(occupancy.length, 1) +} + function DataList() { const screens = useBreakpoint() const isCompact = !screens.lg @@ -48,6 +230,7 @@ function DataList() { const mainAreaRef = useRef(null) const rightColumnRef = useRef(null) const tableHeaderRef = useRef(null) + const summaryBodyRef = useRef(null) const hasCustomLeftWidthRef = useRef(false) const [mainAreaWidth, setMainAreaWidth] = useState(0) @@ -55,6 +238,7 @@ function DataList() { const [rightColumnHeight, setRightColumnHeight] = useState(0) const [tableHeaderHeight, setTableHeaderHeight] = useState(0) const [leftPanelWidth, setLeftPanelWidth] = useState(360) + const [summaryBodyHeight, setSummaryBodyHeight] = useState(0) const [data, setData] = useState([]) const [loading, setLoading] = useState(false) @@ -62,13 +246,11 @@ function DataList() { const [total, setTotal] = useState(0) const [page, setPage] = useState(1) const [pageSize, setPageSize] = useState(20) - const [sourceFilter, setSourceFilter] = useState() - const [typeFilter, setTypeFilter] = useState() - const [countryFilter, setCountryFilter] = useState() + const [sourceFilter, setSourceFilter] = useState([]) + const [typeFilter, setTypeFilter] = useState([]) const [searchText, setSearchText] = useState('') const [sources, setSources] = useState([]) const [types, setTypes] = useState([]) - const [countries, setCountries] = useState([]) const [detailVisible, setDetailVisible] = useState(false) const [detailData, setDetailData] = useState(null) const [detailLoading, setDetailLoading] = useState(false) @@ -79,6 +261,7 @@ function DataList() { setMainAreaHeight(mainAreaRef.current?.offsetHeight || 0) setRightColumnHeight(rightColumnRef.current?.offsetHeight || 0) setTableHeaderHeight(tableHeaderRef.current?.offsetHeight || 0) + setSummaryBodyHeight(summaryBodyRef.current?.offsetHeight || 0) } updateLayout() @@ -93,6 +276,7 @@ function DataList() { if (mainAreaRef.current) observer.observe(mainAreaRef.current) if (rightColumnRef.current) observer.observe(rightColumnRef.current) if (tableHeaderRef.current) observer.observe(tableHeaderRef.current) + if (summaryBodyRef.current) observer.observe(summaryBodyRef.current) return () => observer.disconnect() }, [isCompact]) @@ -147,9 +331,8 @@ function DataList() { page: page.toString(), page_size: pageSize.toString(), }) - if (sourceFilter) params.append('source', sourceFilter) - if (typeFilter) params.append('data_type', typeFilter) - if (countryFilter) params.append('country', countryFilter) + if (sourceFilter.length > 0) params.append('source', sourceFilter.join(',')) + if (typeFilter.length > 0) params.append('data_type', typeFilter.join(',')) if (searchText) params.append('search', searchText) const res = await axios.get(`/api/v1/collected?${params}`) @@ -173,14 +356,12 @@ function DataList() { const fetchFilters = async () => { try { - const [sourcesRes, typesRes, countriesRes] = await Promise.all([ + const [sourcesRes, typesRes] = await Promise.all([ axios.get('/api/v1/collected/sources'), axios.get('/api/v1/collected/types'), - axios.get('/api/v1/collected/countries'), ]) setSources(sourcesRes.data.sources || []) setTypes(typesRes.data.data_types || []) - setCountries(countriesRes.data.countries || []) } catch (error) { console.error('Failed to fetch filters:', error) } @@ -193,7 +374,7 @@ function DataList() { useEffect(() => { fetchData() - }, [page, pageSize, sourceFilter, typeFilter, countryFilter]) + }, [page, pageSize, sourceFilter, typeFilter]) const handleSearch = () => { setPage(1) @@ -201,9 +382,8 @@ function DataList() { } const handleReset = () => { - setSourceFilter(undefined) - setTypeFilter(undefined) - setCountryFilter(undefined) + setSourceFilter([]) + setTypeFilter([]) setSearchText('') setPage(1) setTimeout(fetchData, 0) @@ -234,6 +414,47 @@ function DataList() { return iconMap[source] || } + const getSourceTagColor = (source: string) => { + const colorMap: Record = { + top500: 'geekblue', + huggingface_models: 'purple', + huggingface_datasets: 'cyan', + huggingface_spaces: 'magenta', + telegeography_cables: 'green', + epoch_ai_gpu: 'volcano', + } + return colorMap[source] || 'blue' + } + + const getDataTypeTagColor = (dataType: string) => { + const colorMap: Record = { + supercomputer: 'geekblue', + model: 'purple', + dataset: 'cyan', + space: 'magenta', + submarine_cable: 'green', + cable_landing_point: 'lime', + cable_landing_relation: 'gold', + gpu_cluster: 'volcano', + generic: 'default', + } + return colorMap[dataType] || 'default' + } + + const renderFilterTag = (tagProps: CustomTagProps, getColor: (value: string) => string) => { + const { label, value, closable, onClose } = tagProps + return ( + + {label} + + ) + } + const getTypeColor = (type: string) => { const colors: Record = { supercomputer: 'red', @@ -250,8 +471,8 @@ function DataList() { } const activeFilterCount = useMemo( - () => [sourceFilter, typeFilter, countryFilter, searchText.trim()].filter(Boolean).length, - [sourceFilter, typeFilter, countryFilter, searchText] + () => [sourceFilter.length > 0, typeFilter.length > 0, searchText.trim()].filter(Boolean).length, + [sourceFilter, typeFilter, searchText] ) const summaryItems = useMemo(() => { @@ -281,30 +502,24 @@ function DataList() { return 4 }, [isCompact, leftPanelWidth]) - const treemapRowHeight = useMemo(() => { - if (isCompact) return 88 - if (leftPanelWidth < 360) return 44 - if (leftPanelWidth < 520) return 48 - return 56 - }, [isCompact, leftPanelWidth]) - const treemapItems = useMemo(() => { const palette = ['ocean', 'sky', 'mint', 'amber', 'rose', 'violet', 'slate'] const maxValue = Math.max(...summaryItems.map((item) => item.value), 1) - const allowTallTiles = !isCompact && leftPanelWidth >= 520 + const allowFeaturedTile = !isCompact && treemapColumns > 1 && summaryItems.length > 2 + const allowSecondaryTallTiles = !isCompact && leftPanelWidth >= 520 return summaryItems.map((item, index) => { const ratio = item.value / maxValue let colSpan = 1 let rowSpan = 1 - if (allowTallTiles && index === 0) { + if (allowFeaturedTile && index === 0) { colSpan = Math.min(2, treemapColumns) rowSpan = 2 - } else if (allowTallTiles && ratio >= 0.7) { + } else if (allowSecondaryTallTiles && ratio >= 0.7) { colSpan = Math.min(2, treemapColumns) rowSpan = 2 - } else if (allowTallTiles && ratio >= 0.35) { + } else if (allowSecondaryTallTiles && ratio >= 0.35) { rowSpan = 2 } @@ -317,27 +532,70 @@ function DataList() { }) }, [summaryItems, isCompact, leftPanelWidth, treemapColumns]) + const treemapRows = useMemo( + () => estimateTreemapRows(treemapItems, treemapColumns), + [treemapColumns, treemapItems] + ) + + const treemapGap = isCompact ? 8 : 10 + const treemapMinRowHeight = isCompact ? 88 : 68 + const treemapTargetRowHeight = isCompact ? 88 : leftPanelWidth < 360 ? 44 : leftPanelWidth < 520 ? 48 : 56 + const treemapAvailableHeight = Math.max(summaryBodyHeight, 0) + const treemapAutoRowHeight = treemapRows > 0 + ? Math.floor((treemapAvailableHeight - Math.max(0, treemapRows - 1) * treemapGap) / treemapRows) + : treemapTargetRowHeight + const treemapRowHeight = Math.max( + treemapMinRowHeight, + Math.min(treemapTargetRowHeight, treemapAutoRowHeight || treemapTargetRowHeight) + ) + const treemapContentHeight = treemapRows * treemapRowHeight + Math.max(0, treemapRows - 1) * treemapGap + const treemapTilePadding = treemapRowHeight <= 72 ? 8 : treemapRowHeight <= 84 ? 10 : 12 + const treemapLabelSize = treemapRowHeight <= 72 ? 10 : treemapRowHeight <= 84 ? 11 : 12 + const treemapValueSize = treemapRowHeight <= 72 ? 13 : treemapRowHeight <= 84 ? 15 : 16 + const pageHeight = '100%' const desktopTableHeight = rightColumnHeight - tableHeaderHeight - 132 const compactTableHeight = mainAreaHeight - tableHeaderHeight - 156 const tableHeight = Math.max(180, isCompact ? compactTableHeight : desktopTableHeight) + const detailBaseItems = useMemo(() => { + if (!detailData) return [] + + return DETAIL_BASE_FIELDS.map((key) => ({ + key, + label: formatFieldLabel(key), + value: formatDetailValue(key, detailData[key as keyof CollectedData]), + })).filter((item) => item.value !== '-') + }, [detailData]) + + const detailMetadataItems = useMemo(() => { + if (!detailData?.metadata) return [] + + return Object.entries(detailData.metadata) + .filter(([key]) => key !== '_detail_url') + .map(([key, value]) => ({ + key, + label: formatFieldLabel(key), + value: formatDetailValue(key, value), + isBlock: typeof value === 'object' && value !== null, + })) + }, [detailData]) + const splitLayoutStyle = isCompact ? undefined : { gridTemplateColumns: `${leftPanelWidth}px 12px minmax(0, 1fr)` } const columns: ColumnsType = [ - { title: 'ID', dataIndex: 'id', key: 'id', width: 80 }, { title: '名称', dataIndex: 'name', key: 'name', - width: 280, + width: 320, ellipsis: true, render: (name: string, record: CollectedData) => ( - ), @@ -346,23 +604,31 @@ function DataList() { title: '数据源', dataIndex: 'source', key: 'source', - width: 170, - render: (source: string) => {source}, + minWidth: 140, + render: (value: string) => ( + value ? ( +
+ + {value} + +
+ ) : '-' + ), }, { - title: '类型', + title: '数据类型', dataIndex: 'data_type', key: 'data_type', - width: 120, - render: (type: string) => {type}, - }, - { title: '国家/地区', dataIndex: 'country', key: 'country', width: 130, ellipsis: true }, - { - title: '数值', - dataIndex: 'value', - key: 'value', - width: 140, - render: (value: string | null, record: CollectedData) => (value ? `${value} ${record.unit || ''}` : '-'), + minWidth: 140, + render: (value: string) => ( + value ? ( +
+ + {value} + +
+ ) : '-' + ), }, { title: '采集时间', @@ -371,6 +637,13 @@ function DataList() { width: 180, render: (time: string) => new Date(time).toLocaleString('zh-CN'), }, + { + title: '参考日期', + dataIndex: 'reference_date', + key: 'reference_date', + width: 120, + render: (time: string | null) => (time ? new Date(time).toLocaleDateString('zh-CN') : '-'), + }, { title: '操作', key: 'action', @@ -406,14 +679,21 @@ function DataList() { className="data-list-summary-card data-list-summary-card--panel" title="数据概览" size="small" - bodyStyle={{ padding: isCompact ? 12 : 16 }} + styles={{ body: { padding: isCompact ? 12 : 16 } }} > +
0 ? Math.min(treemapContentHeight, treemapAvailableHeight) : undefined, + height: treemapContentHeight, + ['--data-list-treemap-tile-padding' as '--data-list-treemap-tile-padding']: `${treemapTilePadding}px`, + ['--data-list-treemap-label-size' as '--data-list-treemap-label-size']: `${treemapLabelSize}px`, + ['--data-list-treemap-value-size' as '--data-list-treemap-value-size']: `${treemapValueSize}px`, + } as CSSProperties} > {treemapItems.map((item) => (
))}
+
{!isCompact && ( @@ -449,7 +730,7 @@ function DataList() { )}
- +
@@ -468,6 +749,7 @@ function DataList() { { @@ -487,23 +772,13 @@ function DataList() { setPage(1) }} options={types.map((type) => ({ label: type, value: type }))} + tagRender={(tagProps) => renderFilterTag(tagProps, getDataTypeTagColor)} style={{ width: '100%' }} - /> - setSearchText(event.target.value)} onPressEnter={handleSearch} @@ -516,9 +791,8 @@ function DataList() { dataSource={data} rowKey="id" loading={loading} - virtual scroll={{ x: 'max-content', y: tableHeight }} - tableLayout="fixed" + tableLayout="auto" size={isCompact ? 'small' : 'middle'} pagination={{ current: page, @@ -548,38 +822,65 @@ function DataList() { 关闭 , ]} - width={700} + width={880} > {detailLoading ? (
) : detailData ? ( - - {detailData.id} - {detailData.source} - {detailData.data_type} - {detailData.source_id || '-'} - {detailData.name} - {detailData.title || '-'} - {detailData.description || '-'} - {detailData.country || '-'} - {detailData.city || '-'} - {detailData.longitude || '-'} - {detailData.latitude || '-'} - {detailData.value} {detailData.unit || ''} - - {new Date(detailData.collected_at).toLocaleString('zh-CN')} - - - {detailData.reference_date ? new Date(detailData.reference_date).toLocaleDateString('zh-CN') : '-'} - - -
+          
+
+
+ 名称 + + {detailData.name || '-'} + +
+
+ + {detailBaseItems.length > 0 && ( +
+ 基础信息 +
+ {detailBaseItems.map((item) => ( +
+ {item.label} +
{item.value}
+
+ ))} +
+
+ )} + + {detailMetadataItems.length > 0 && ( +
+ 扩展字段 +
+ {detailMetadataItems.map((item) => ( +
+ {item.label} + {item.isBlock ? ( +
{item.value}
+ ) : ( +
{item.value}
+ )} +
+ ))} +
+
+ )} + +
+ 原始元数据 +
                 {JSON.stringify(detailData.metadata || {}, null, 2)}
               
- - +
+
) : ( )} diff --git a/frontend/src/pages/DataSources/DataSources.tsx b/frontend/src/pages/DataSources/DataSources.tsx index 7e58b35b..d8bb3bb3 100644 --- a/frontend/src/pages/DataSources/DataSources.tsx +++ b/frontend/src/pages/DataSources/DataSources.tsx @@ -7,7 +7,7 @@ import { PlayCircleOutlined, PauseCircleOutlined, PlusOutlined, EditOutlined, DeleteOutlined, ApiOutlined, CheckCircleOutlined, CloseCircleOutlined, ExperimentOutlined, - SyncOutlined, ClearOutlined + SyncOutlined, ClearOutlined, CopyOutlined } from '@ant-design/icons' import axios from 'axios' import AppLayout from '../../components/AppLayout/AppLayout' @@ -18,16 +18,28 @@ interface BuiltInDataSource { module: string priority: string frequency: string + endpoint?: string is_active: boolean collector_class: string last_run: string | null is_running: boolean task_id: number | null progress: number | null + phase?: string | null records_processed: number | null total_records: number | null } +interface TaskTrackerState { + task_id: number | null + is_running: boolean + progress: number + phase: string | null + status?: string | null + records_processed?: number | null + total_records?: number | null +} + interface CustomDataSource { id: number name: string @@ -89,7 +101,7 @@ function DataSources() { } } - const [taskProgress, setTaskProgress] = useState>({}) + const [taskProgress, setTaskProgress] = useState>({}) useEffect(() => { fetchData() @@ -118,80 +130,85 @@ function DataSources() { }, [activeTab, builtInSources.length, customSources.length]) useEffect(() => { - const runningSources = builtInSources.filter(s => s.is_running) - if (runningSources.length === 0) return + const trackedSources = builtInSources.filter((source) => { + const trackedTask = taskProgress[source.id] + return Boolean((trackedTask?.task_id ?? source.task_id) && (trackedTask?.is_running ?? source.is_running)) + }) + + if (trackedSources.length === 0) return const interval = setInterval(async () => { - const progressMap: Record = {} - + const updates: Record = {} + await Promise.all( - runningSources.map(async (source) => { + trackedSources.map(async (source) => { + const trackedTaskId = taskProgress[source.id]?.task_id ?? source.task_id + if (!trackedTaskId) return + try { - const res = await axios.get(`/api/v1/datasources/${source.id}/task-status`) - progressMap[source.id] = { + const res = await axios.get(`/api/v1/datasources/${source.id}/task-status`, { + params: { task_id: trackedTaskId }, + }) + updates[source.id] = { + task_id: res.data.task_id ?? trackedTaskId, progress: res.data.progress || 0, - is_running: res.data.is_running + is_running: !!res.data.is_running, + phase: res.data.phase || null, + status: res.data.status || null, + records_processed: res.data.records_processed, + total_records: res.data.total_records, } } catch { - progressMap[source.id] = { progress: 0, is_running: false } + updates[source.id] = { + task_id: trackedTaskId, + progress: 0, + is_running: false, + phase: 'failed', + status: 'failed', + } } }) ) - - setTaskProgress(prev => ({ ...prev, ...progressMap })) + + setTaskProgress((prev) => { + const next = { ...prev, ...updates } + for (const [sourceId, state] of Object.entries(updates)) { + if (!state.is_running && state.status !== 'running') { + delete next[Number(sourceId)] + } + } + return next + }) + + if (Object.values(updates).some((state) => !state.is_running)) { + fetchData() + } }, 2000) return () => clearInterval(interval) - }, [builtInSources.map(s => s.id).join(',')]) + }, [builtInSources, taskProgress]) const handleTrigger = async (id: number) => { try { - await axios.post(`/api/v1/datasources/${id}/trigger`) + const res = await axios.post(`/api/v1/datasources/${id}/trigger`) message.success('任务已触发') - // Trigger polling immediately - setTaskProgress(prev => ({ ...prev, [id]: { progress: 0, is_running: true } })) - // Also refresh data + setTaskProgress(prev => ({ + ...prev, + [id]: { + task_id: res.data.task_id ?? null, + progress: 0, + is_running: true, + phase: 'queued', + status: 'running', + }, + })) fetchData() - // Also fetch the running task status - pollTaskStatus(id) } catch (error: unknown) { const err = error as { response?: { data?: { detail?: string } } } message.error(err.response?.data?.detail || '触发失败') } } - const pollTaskStatus = async (sourceId: number) => { - const poll = async () => { - try { - const res = await axios.get(`/api/v1/datasources/${sourceId}/task-status`) - const data = res.data - - setTaskProgress(prev => ({ ...prev, [sourceId]: { - progress: data.progress || 0, - is_running: data.is_running - } })) - - // Keep polling while running - if (data.is_running) { - setTimeout(poll, 2000) - } else { - // Task completed - refresh data and clear this source from progress - setTimeout(() => { - setTaskProgress(prev => { - const newState = { ...prev } - delete newState[sourceId] - return newState - }) - }, 1000) - fetchData() - } - } catch { - // Stop polling on error - } - } - poll() - } - const handleToggle = async (id: number, current: boolean) => { const endpoint = current ? 'disable' : 'enable' try { @@ -229,7 +246,7 @@ function DataSources() { name: data.name, description: null, source_type: data.collector_class, - endpoint: '', + endpoint: data.endpoint || '', auth_type: 'none', headers: {}, config: {}, @@ -340,6 +357,27 @@ function DataSources() { setTestResult(null) } + const handleCopyLink = async (value: string, successText: string) => { + try { + if (navigator.clipboard?.writeText) { + await navigator.clipboard.writeText(value) + } else { + const textArea = document.createElement('textarea') + textArea.value = value + textArea.style.position = 'fixed' + textArea.style.opacity = '0' + document.body.appendChild(textArea) + textArea.focus() + textArea.select() + document.execCommand('copy') + document.body.removeChild(textArea) + } + message.success(successText) + } catch { + message.error('复制失败,请手动复制') + } + } + const builtinColumns = [ { title: 'ID', dataIndex: 'id', key: 'id', width: 60, fixed: 'left' as const }, { @@ -374,15 +412,31 @@ function DataSources() { title: '状态', dataIndex: 'is_active', key: 'is_active', - width: 100, + width: 180, render: (_: unknown, record: BuiltInDataSource) => { - const progress = taskProgress[record.id] - if (progress?.is_running || record.is_running) { - const pct = progress?.progress ?? record.progress ?? 0 + const taskState = taskProgress[record.id] + const isTaskRunning = taskState?.is_running || record.is_running + + const phaseLabelMap: Record = { + queued: '排队中', + fetching: '抓取中', + transforming: '处理中', + saving: '保存中', + completed: '已完成', + failed: '失败', + } + + if (isTaskRunning) { + const pct = taskState?.progress ?? record.progress ?? 0 + const phase = taskState?.phase || record.phase || 'queued' return ( - - 采集中 {Math.round(pct)}% - + + {record.is_active ? '运行中' : '已暂停'} + + {phaseLabelMap[phase] || phase} + {pct > 0 ? ` ${Math.round(pct)}%` : ''} + + ) } return {record.is_active ? '运行中' : '已暂停'} @@ -420,6 +474,22 @@ function DataSources() { { title: 'ID', dataIndex: 'id', key: 'id', width: 60, fixed: 'left' as const }, { title: '名称', dataIndex: 'name', key: 'name', width: 150, ellipsis: true }, { title: '类型', dataIndex: 'source_type', key: 'source_type', width: 100 }, + { + title: 'API链接', + dataIndex: 'endpoint', + key: 'endpoint', + width: 280, + ellipsis: true, + render: (endpoint: string) => ( + endpoint ? ( + + + {endpoint} + + + ) : '-' + ), + }, { title: '状态', dataIndex: 'is_active', @@ -477,7 +547,6 @@ function DataSources() { scroll={{ x: 800, y: builtinTableHeight }} tableLayout="fixed" size="small" - virtual />
@@ -509,10 +578,9 @@ function DataSources() { rowKey="id" loading={loading} pagination={false} - scroll={{ x: 600, y: customTableHeight }} + scroll={{ x: 900, y: customTableHeight }} tableLayout="fixed" size="small" - virtual />
)} @@ -811,6 +879,19 @@ function DataSources() { + + + + +
+ ) +} + function Settings() { const [loading, setLoading] = useState(true) const [savingCollectorId, setSavingCollectorId] = useState(null) @@ -227,7 +243,7 @@ function Settings() { { title: '操作', key: 'action', - width: 120, + width: 92, fixed: 'right' as const, render: (_: unknown, record: CollectorSettings) => ( + + + ), + }, + { + key: 'notifications', + label: '通知策略', + children: ( + +
saveSection('notifications', values)}> + + + + + + + + + + + + + + + + +
+
+ ), + }, + { + key: 'security', + label: '安全策略', + children: ( + +
saveSection('security', values)}> + + + + + + + + - - - - - - - - - - - - - - -
-
- -
- ), - }, - { - key: 'notifications', - label: '通知策略', - children: ( -
- -
-
saveSection('notifications', values)}> - - - - - - - - - - - - - - - - -
-
-
-
- ), - }, - { - key: 'security', - label: '安全策略', - children: ( -
- -
-
saveSection('security', values)}> - - - - - - - -