From b0058edf171497ddfaec07d4d5d11721a7c7c3dd Mon Sep 17 00:00:00 2001 From: linkong Date: Fri, 27 Mar 2026 14:27:07 +0800 Subject: [PATCH] feat: add bgp observability and admin ui improvements --- VERSION | 2 +- backend/app/api/main.py | 2 + backend/app/api/v1/alerts.py | 6 +- backend/app/api/v1/bgp.py | 182 +++++++ backend/app/api/v1/collected_data.py | 50 +- backend/app/api/v1/dashboard.py | 11 +- backend/app/api/v1/datasource_config.py | 9 +- backend/app/api/v1/datasources.py | 129 ++++- backend/app/api/v1/settings.py | 11 +- backend/app/api/v1/tasks.py | 15 +- backend/app/api/v1/visualization.py | 78 ++- backend/app/api/v1/websocket.py | 6 +- backend/app/core/data_sources.py | 2 + backend/app/core/data_sources.yaml | 6 + backend/app/core/datasource_defaults.py | 14 + backend/app/core/security.py | 8 +- backend/app/core/time.py | 20 + backend/app/core/websocket/broadcaster.py | 25 +- backend/app/models/__init__.py | 4 +- backend/app/models/alert.py | 9 +- backend/app/models/bgp_anomaly.py | 58 +++ backend/app/models/collected_data.py | 11 +- backend/app/services/collectors/__init__.py | 4 + .../app/services/collectors/arcgis_cables.py | 4 +- .../app/services/collectors/arcgis_landing.py | 4 +- .../services/collectors/arcgis_relation.py | 4 +- backend/app/services/collectors/base.py | 86 +++- backend/app/services/collectors/bgp_common.py | 313 +++++++++++ backend/app/services/collectors/bgpstream.py | 120 +++++ backend/app/services/collectors/cloudflare.py | 8 +- backend/app/services/collectors/epoch_ai.py | 6 +- .../app/services/collectors/fao_landing.py | 4 +- .../app/services/collectors/huggingface.py | 8 +- backend/app/services/collectors/peeringdb.py | 8 +- backend/app/services/collectors/ris_live.py | 131 +++++ .../app/services/collectors/telegeography.py | 16 +- backend/app/services/scheduler.py | 13 +- backend/tests/test_bgp.py | 74 +++ docs/bgp-observability-plan.md | 487 ++++++++++++++++++ frontend/package.json | 2 +- frontend/src/App.tsx | 2 + .../src/components/AppLayout/AppLayout.tsx | 2 + frontend/src/index.css | 59 +++ frontend/src/pages/Alerts/Alerts.tsx | 9 +- frontend/src/pages/BGP/BGP.tsx | 159 ++++++ frontend/src/pages/Dashboard/Dashboard.tsx | 3 +- frontend/src/pages/DataList/DataList.tsx | 79 ++- .../src/pages/DataSources/DataSources.tsx | 400 ++++++++++---- frontend/src/pages/Settings/Settings.tsx | 5 +- frontend/src/pages/Tasks/Tasks.tsx | 3 +- frontend/src/utils/datetime.ts | 47 ++ 51 files changed, 2473 insertions(+), 245 deletions(-) create mode 100644 backend/app/api/v1/bgp.py create mode 100644 backend/app/core/time.py create mode 100644 backend/app/models/bgp_anomaly.py create mode 100644 backend/app/services/collectors/bgp_common.py create mode 100644 backend/app/services/collectors/bgpstream.py create mode 100644 backend/app/services/collectors/ris_live.py create mode 100644 backend/tests/test_bgp.py create mode 100644 docs/bgp-observability-plan.md create mode 100644 frontend/src/pages/BGP/BGP.tsx create mode 100644 frontend/src/utils/datetime.ts diff --git a/VERSION b/VERSION index 16eb94e7..c0e0456c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.21.3 +0.21.4-dev diff --git a/backend/app/api/main.py b/backend/app/api/main.py index 1cd69b55..3a7f8c39 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -11,6 +11,7 @@ from app.api.v1 import ( settings, collected_data, visualization, + bgp, ) api_router = APIRouter() @@ -27,3 +28,4 @@ api_router.include_router(dashboard.router, prefix="/dashboard", tags=["dashboar api_router.include_router(alerts.router, prefix="/alerts", tags=["alerts"]) api_router.include_router(settings.router, prefix="/settings", tags=["settings"]) api_router.include_router(visualization.router, prefix="/visualization", tags=["visualization"]) +api_router.include_router(bgp.router, prefix="/bgp", tags=["bgp"]) diff --git a/backend/app/api/v1/alerts.py b/backend/app/api/v1/alerts.py index f77766c4..c741f46b 100644 --- a/backend/app/api/v1/alerts.py +++ b/backend/app/api/v1/alerts.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime from typing import Optional from fastapi import APIRouter, Depends @@ -68,7 +68,7 @@ async def acknowledge_alert( alert.status = AlertStatus.ACKNOWLEDGED alert.acknowledged_by = current_user.id - alert.acknowledged_at = datetime.utcnow() + alert.acknowledged_at = datetime.now(UTC) await db.commit() return {"message": "Alert acknowledged", "alert": alert.to_dict()} @@ -89,7 +89,7 @@ async def resolve_alert( alert.status = AlertStatus.RESOLVED alert.resolved_by = current_user.id - alert.resolved_at = datetime.utcnow() + alert.resolved_at = datetime.now(UTC) alert.resolution_notes = resolution await db.commit() diff --git a/backend/app/api/v1/bgp.py b/backend/app/api/v1/bgp.py new file mode 100644 index 00000000..0a7818f5 --- /dev/null +++ b/backend/app/api/v1/bgp.py @@ -0,0 +1,182 @@ +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.security import get_current_user +from app.db.session import get_db +from app.models.bgp_anomaly import BGPAnomaly +from app.models.collected_data import CollectedData +from app.models.user import User + +router = APIRouter() + +BGP_SOURCES = ("ris_live_bgp", "bgpstream_bgp") + + +def _parse_dt(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +def _matches_time(value: Optional[datetime], time_from: Optional[datetime], time_to: Optional[datetime]) -> bool: + if value is None: + return False + if time_from and value < time_from: + return False + if time_to and value > time_to: + return False + return True + + +@router.get("/events") +async def list_bgp_events( + prefix: Optional[str] = Query(None), + origin_asn: Optional[int] = Query(None), + peer_asn: Optional[int] = Query(None), + collector: Optional[str] = Query(None), + event_type: Optional[str] = Query(None), + source: Optional[str] = Query(None), + time_from: Optional[str] = Query(None), + time_to: Optional[str] = Query(None), + page: int = Query(1, ge=1), + page_size: int = Query(50, ge=1, le=200), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + stmt = ( + select(CollectedData) + .where(CollectedData.source.in_(BGP_SOURCES)) + .order_by(CollectedData.reference_date.desc().nullslast(), CollectedData.id.desc()) + ) + if source: + stmt = stmt.where(CollectedData.source == source) + + result = await db.execute(stmt) + records = result.scalars().all() + dt_from = _parse_dt(time_from) + dt_to = _parse_dt(time_to) + + filtered = [] + for record in records: + metadata = record.extra_data or {} + if prefix and metadata.get("prefix") != prefix: + continue + if origin_asn is not None and metadata.get("origin_asn") != origin_asn: + continue + if peer_asn is not None and metadata.get("peer_asn") != peer_asn: + continue + if collector and metadata.get("collector") != collector: + continue + if event_type and metadata.get("event_type") != event_type: + continue + if (dt_from or dt_to) and not _matches_time(record.reference_date, dt_from, dt_to): + continue + filtered.append(record) + + offset = (page - 1) * page_size + return { + "total": len(filtered), + "page": page, + "page_size": page_size, + "data": [record.to_dict() for record in filtered[offset : offset + page_size]], + } + + +@router.get("/events/{event_id}") +async def get_bgp_event( + event_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + record = await db.get(CollectedData, event_id) + if not record or record.source not in BGP_SOURCES: + raise HTTPException(status_code=404, detail="BGP event not found") + return record.to_dict() + + +@router.get("/anomalies") +async def list_bgp_anomalies( + severity: Optional[str] = Query(None), + anomaly_type: Optional[str] = Query(None), + status: Optional[str] = Query(None), + prefix: Optional[str] = Query(None), + origin_asn: Optional[int] = Query(None), + time_from: Optional[str] = Query(None), + time_to: Optional[str] = Query(None), + page: int = Query(1, ge=1), + page_size: int = Query(50, ge=1, le=200), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + stmt = select(BGPAnomaly).order_by(BGPAnomaly.created_at.desc(), BGPAnomaly.id.desc()) + if severity: + stmt = stmt.where(BGPAnomaly.severity == severity) + if anomaly_type: + stmt = stmt.where(BGPAnomaly.anomaly_type == anomaly_type) + if status: + stmt = stmt.where(BGPAnomaly.status == status) + if prefix: + stmt = stmt.where(BGPAnomaly.prefix == prefix) + if origin_asn is not None: + stmt = stmt.where(BGPAnomaly.origin_asn == origin_asn) + + result = await db.execute(stmt) + records = result.scalars().all() + dt_from = _parse_dt(time_from) + dt_to = _parse_dt(time_to) + if dt_from or dt_to: + records = [record for record in records if _matches_time(record.created_at, dt_from, dt_to)] + + offset = (page - 1) * page_size + return { + "total": len(records), + "page": page, + "page_size": page_size, + "data": [record.to_dict() for record in records[offset : offset + page_size]], + } + + +@router.get("/anomalies/summary") +async def get_bgp_anomaly_summary( + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + total_result = await db.execute(select(func.count(BGPAnomaly.id))) + type_result = await db.execute( + select(BGPAnomaly.anomaly_type, func.count(BGPAnomaly.id)) + .group_by(BGPAnomaly.anomaly_type) + .order_by(func.count(BGPAnomaly.id).desc()) + ) + severity_result = await db.execute( + select(BGPAnomaly.severity, func.count(BGPAnomaly.id)) + .group_by(BGPAnomaly.severity) + .order_by(func.count(BGPAnomaly.id).desc()) + ) + status_result = await db.execute( + select(BGPAnomaly.status, func.count(BGPAnomaly.id)) + .group_by(BGPAnomaly.status) + .order_by(func.count(BGPAnomaly.id).desc()) + ) + + return { + "total": total_result.scalar() or 0, + "by_type": {row[0]: row[1] for row in type_result.fetchall()}, + "by_severity": {row[0]: row[1] for row in severity_result.fetchall()}, + "by_status": {row[0]: row[1] for row in status_result.fetchall()}, + } + + +@router.get("/anomalies/{anomaly_id}") +async def get_bgp_anomaly( + anomaly_id: int, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + record = await db.get(BGPAnomaly, anomaly_id) + if not record: + raise HTTPException(status_code=404, detail="BGP anomaly not found") + return record.to_dict() diff --git a/backend/app/api/v1/collected_data.py b/backend/app/api/v1/collected_data.py index 0b77edfa..2f203657 100644 --- a/backend/app/api/v1/collected_data.py +++ b/backend/app/api/v1/collected_data.py @@ -9,10 +9,12 @@ import io from app.core.collected_data_fields import get_metadata_field from app.core.countries import COUNTRY_OPTIONS, get_country_search_variants, normalize_country +from app.core.time import to_iso8601_utc from app.db.session import get_db from app.models.user import User from app.core.security import get_current_user from app.models.collected_data import CollectedData +from app.models.datasource import DataSource router = APIRouter() @@ -100,11 +102,13 @@ def build_search_rank_sql(search: Optional[str]) -> str: """ -def serialize_collected_row(row) -> dict: +def serialize_collected_row(row, source_name_map: dict[str, str] | None = None) -> dict: metadata = row[7] + source = row[1] return { "id": row[0], - "source": row[1], + "source": source, + "source_name": source_name_map.get(source, source) if source_name_map else source, "source_id": row[2], "data_type": row[3], "name": row[4], @@ -121,12 +125,17 @@ def serialize_collected_row(row) -> dict: "rmax": get_metadata_field(metadata, "rmax"), "rpeak": get_metadata_field(metadata, "rpeak"), "power": get_metadata_field(metadata, "power"), - "collected_at": row[8].isoformat() if row[8] else None, - "reference_date": row[9].isoformat() if row[9] else None, + "collected_at": to_iso8601_utc(row[8]), + "reference_date": to_iso8601_utc(row[9]), "is_valid": row[10], } +async def get_source_name_map(db: AsyncSession) -> dict[str, str]: + result = await db.execute(select(DataSource.source, DataSource.name)) + return {row[0]: row[1] for row in result.fetchall()} + + @router.get("") async def list_collected_data( mode: str = Query("current", description="查询模式: current/history"), @@ -188,10 +197,11 @@ async def list_collected_data( result = await db.execute(query, params) rows = result.fetchall() + source_name_map = await get_source_name_map(db) data = [] for row in rows: - data.append(serialize_collected_row(row[:11])) + data.append(serialize_collected_row(row[:11], source_name_map)) return { "total": total, @@ -221,6 +231,7 @@ async def get_data_summary( """) ) rows = result.fetchall() + source_name_map = await get_source_name_map(db) by_source = {} total = 0 @@ -229,9 +240,10 @@ async def get_data_summary( data_type = row[1] count = row[2] - if source not in by_source: - by_source[source] = {} - by_source[source][data_type] = count + source_key = source_name_map.get(source, source) + if source_key not in by_source: + by_source[source_key] = {} + by_source[source_key][data_type] = count total += count # Total by source @@ -249,7 +261,14 @@ async def get_data_summary( return { "total_records": total, "by_source": by_source, - "source_totals": [{"source": row[0], "count": row[1]} for row in source_rows], + "source_totals": [ + { + "source": row[0], + "source_name": source_name_map.get(row[0], row[0]), + "count": row[1], + } + for row in source_rows + ], } @@ -269,9 +288,13 @@ async def get_data_sources( """) ) rows = result.fetchall() + source_name_map = await get_source_name_map(db) return { - "sources": [row[0] for row in rows], + "sources": [ + {"source": row[0], "source_name": source_name_map.get(row[0], row[0])} + for row in rows + ], } @@ -334,7 +357,8 @@ async def get_collected_data( detail="数据不存在", ) - return serialize_collected_row(row) + source_name_map = await get_source_name_map(db) + return serialize_collected_row(row, source_name_map) def build_where_clause( @@ -482,8 +506,8 @@ async def export_csv( get_metadata_field(row[7], "value"), get_metadata_field(row[7], "unit"), json.dumps(row[7]) if row[7] else "", - row[8].isoformat() if row[8] else "", - row[9].isoformat() if row[9] else "", + to_iso8601_utc(row[8]) or "", + to_iso8601_utc(row[9]) or "", row[10], ] ) diff --git a/backend/app/api/v1/dashboard.py b/backend/app/api/v1/dashboard.py index 8548a992..ddf70afa 100644 --- a/backend/app/api/v1/dashboard.py +++ b/backend/app/api/v1/dashboard.py @@ -1,6 +1,6 @@ """Dashboard API with caching and optimizations""" -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from fastapi import APIRouter, Depends from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession @@ -13,6 +13,7 @@ from app.models.alert import Alert, AlertSeverity from app.models.task import CollectionTask from app.core.security import get_current_user from app.core.cache import cache +from app.core.time import to_iso8601_utc # Built-in collectors info (mirrored from datasources.py) @@ -111,7 +112,7 @@ async def get_stats( if cached_result: return cached_result - today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) + today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) # Count built-in collectors built_in_count = len(COLLECTOR_INFO) @@ -175,7 +176,7 @@ async def get_stats( "active_datasources": active_datasources, "tasks_today": tasks_today, "success_rate": round(success_rate, 1), - "last_updated": datetime.utcnow().isoformat(), + "last_updated": to_iso8601_utc(datetime.now(UTC)), "alerts": { "critical": critical_alerts, "warning": warning_alerts, @@ -230,10 +231,10 @@ async def get_summary( summary[module] = { "datasources": data["datasources"], "total_records": 0, # Built-in don't track this in dashboard stats - "last_updated": datetime.utcnow().isoformat(), + "last_updated": to_iso8601_utc(datetime.now(UTC)), } - response = {"modules": summary, "last_updated": datetime.utcnow().isoformat()} + response = {"modules": summary, "last_updated": to_iso8601_utc(datetime.now(UTC))} cache.set(cache_key, response, expire_seconds=300) diff --git a/backend/app/api/v1/datasource_config.py b/backend/app/api/v1/datasource_config.py index 5995082a..edb72286 100644 --- a/backend/app/api/v1/datasource_config.py +++ b/backend/app/api/v1/datasource_config.py @@ -14,6 +14,7 @@ from app.models.user import User from app.models.datasource_config import DataSourceConfig from app.core.security import get_current_user from app.core.cache import cache +from app.core.time import to_iso8601_utc router = APIRouter() @@ -123,8 +124,8 @@ async def list_configs( "headers": c.headers, "config": c.config, "is_active": c.is_active, - "created_at": c.created_at.isoformat() if c.created_at else None, - "updated_at": c.updated_at.isoformat() if c.updated_at else None, + "created_at": to_iso8601_utc(c.created_at), + "updated_at": to_iso8601_utc(c.updated_at), } for c in configs ], @@ -155,8 +156,8 @@ async def get_config( "headers": config.headers, "config": config.config, "is_active": config.is_active, - "created_at": config.created_at.isoformat() if config.created_at else None, - "updated_at": config.updated_at.isoformat() if config.updated_at else None, + "created_at": to_iso8601_utc(config.created_at), + "updated_at": to_iso8601_utc(config.updated_at), } diff --git a/backend/app/api/v1/datasources.py b/backend/app/api/v1/datasources.py index be4e4543..92b01193 100644 --- a/backend/app/api/v1/datasources.py +++ b/backend/app/api/v1/datasources.py @@ -1,9 +1,12 @@ +import asyncio +from datetime import datetime, timedelta, timezone from typing import Optional -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession +from app.core.time import to_iso8601_utc from app.core.security import get_current_user from app.core.data_sources import get_data_sources_config from app.db.session import get_db @@ -24,6 +27,12 @@ def format_frequency_label(minutes: int) -> str: return f"{minutes}m" +def is_due_for_collection(datasource: DataSource, now: datetime) -> bool: + if datasource.last_run_at is None: + return True + return datasource.last_run_at + timedelta(minutes=datasource.frequency_minutes) <= now + + async def get_datasource_record(db: AsyncSession, source_id: str) -> Optional[DataSource]: datasource = None try: @@ -47,6 +56,7 @@ async def get_last_completed_task(db: AsyncSession, datasource_id: int) -> Optio select(CollectionTask) .where(CollectionTask.datasource_id == datasource_id) .where(CollectionTask.completed_at.isnot(None)) + .where(CollectionTask.status.in_(("success", "failed", "cancelled"))) .order_by(CollectionTask.completed_at.desc()) .limit(1) ) @@ -94,9 +104,9 @@ async def list_datasources( ) data_count = data_count_result.scalar() or 0 - last_run = None - if last_task and last_task.completed_at and data_count > 0: - last_run = last_task.completed_at.strftime("%Y-%m-%d %H:%M") + last_run_at = datasource.last_run_at or (last_task.completed_at if last_task else None) + last_run = to_iso8601_utc(last_run_at) + last_status = datasource.last_status or (last_task.status if last_task else None) collector_list.append( { @@ -110,6 +120,10 @@ async def list_datasources( "collector_class": datasource.collector_class, "endpoint": endpoint, "last_run": last_run, + "last_run_at": to_iso8601_utc(last_run_at), + "last_status": last_status, + "last_records_processed": last_task.records_processed if last_task else None, + "data_count": data_count, "is_running": running_task is not None, "task_id": running_task.id if running_task else None, "progress": running_task.progress if running_task else None, @@ -122,6 +136,105 @@ async def list_datasources( return {"total": len(collector_list), "data": collector_list} +@router.post("/trigger-all") +async def trigger_all_datasources( + force: bool = Query(False), + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + result = await db.execute( + select(DataSource) + .where(DataSource.is_active == True) + .order_by(DataSource.module, DataSource.id) + ) + datasources = result.scalars().all() + + if not datasources: + return { + "status": "noop", + "message": "No active data sources to trigger", + "triggered": [], + "skipped": [], + "failed": [], + } + + previous_task_ids: dict[int, Optional[int]] = {} + triggered_sources: list[dict] = [] + skipped_sources: list[dict] = [] + failed_sources: list[dict] = [] + now = datetime.now(timezone.utc) + + for datasource in datasources: + running_task = await get_running_task(db, datasource.id) + if running_task is not None: + skipped_sources.append( + { + "id": datasource.id, + "source": datasource.source, + "name": datasource.name, + "reason": "already_running", + "task_id": running_task.id, + } + ) + continue + + if not force and not is_due_for_collection(datasource, now): + skipped_sources.append( + { + "id": datasource.id, + "source": datasource.source, + "name": datasource.name, + "reason": "within_frequency_window", + "last_run_at": to_iso8601_utc(datasource.last_run_at), + "next_run_at": to_iso8601_utc( + datasource.last_run_at + timedelta(minutes=datasource.frequency_minutes) + ), + } + ) + continue + + previous_task_ids[datasource.id] = await get_latest_task_id_for_datasource(datasource.id) + success = run_collector_now(datasource.source) + if not success: + failed_sources.append( + { + "id": datasource.id, + "source": datasource.source, + "name": datasource.name, + "reason": "trigger_failed", + } + ) + continue + + triggered_sources.append( + { + "id": datasource.id, + "source": datasource.source, + "name": datasource.name, + "task_id": None, + } + ) + + for _ in range(20): + await asyncio.sleep(0.1) + pending = [item for item in triggered_sources if item["task_id"] is None] + if not pending: + break + for item in pending: + task_id = await get_latest_task_id_for_datasource(item["id"]) + if task_id is not None and task_id != previous_task_ids.get(item["id"]): + item["task_id"] = task_id + + return { + "status": "triggered" if triggered_sources else "partial", + "message": f"Triggered {len(triggered_sources)} data sources", + "force": force, + "triggered": triggered_sources, + "skipped": skipped_sources, + "failed": failed_sources, + } + + @router.get("/{source_id}") async def get_datasource( source_id: str, @@ -217,15 +330,19 @@ async def trigger_datasource( if not datasource.is_active: raise HTTPException(status_code=400, detail="Data source is disabled") + previous_task_id = await get_latest_task_id_for_datasource(datasource.id) success = run_collector_now(datasource.source) if not success: raise HTTPException(status_code=500, detail=f"Failed to trigger collector '{datasource.source}'") task_id = None - for _ in range(10): + for _ in range(20): + await asyncio.sleep(0.1) task_id = await get_latest_task_id_for_datasource(datasource.id) - if task_id is not None: + if task_id is not None and task_id != previous_task_id: break + if task_id == previous_task_id: + task_id = None return { "status": "triggered", diff --git a/backend/app/api/v1/settings.py b/backend/app/api/v1/settings.py index cdde6d2a..6439a859 100644 --- a/backend/app/api/v1/settings.py +++ b/backend/app/api/v1/settings.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException @@ -7,6 +7,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import get_current_user +from app.core.time import to_iso8601_utc from app.db.session import get_db from app.models.datasource import DataSource from app.models.system_setting import SystemSetting @@ -114,9 +115,9 @@ def serialize_collector(datasource: DataSource) -> dict: "frequency_minutes": datasource.frequency_minutes, "frequency": format_frequency_label(datasource.frequency_minutes), "is_active": datasource.is_active, - "last_run_at": datasource.last_run_at.isoformat() if datasource.last_run_at else None, + "last_run_at": to_iso8601_utc(datasource.last_run_at), "last_status": datasource.last_status, - "next_run_at": datasource.next_run_at.isoformat() if datasource.next_run_at else None, + "next_run_at": to_iso8601_utc(datasource.next_run_at), } @@ -216,5 +217,5 @@ async def get_all_settings( "notifications": await get_setting_payload(db, "notifications"), "security": await get_setting_payload(db, "security"), "collectors": [serialize_collector(datasource) for datasource in datasources], - "generated_at": datetime.utcnow().isoformat() + "Z", - } + "generated_at": to_iso8601_utc(datetime.now(UTC)), + } diff --git a/backend/app/api/v1/tasks.py b/backend/app/api/v1/tasks.py index b04f816e..8dfb7af5 100644 --- a/backend/app/api/v1/tasks.py +++ b/backend/app/api/v1/tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status @@ -8,6 +8,7 @@ from sqlalchemy import text from app.db.session import get_db from app.models.user import User from app.core.security import get_current_user +from app.core.time import to_iso8601_utc from app.services.collectors.registry import collector_registry @@ -61,8 +62,8 @@ async def list_tasks( "datasource_id": t[1], "datasource_name": t[2], "status": t[3], - "started_at": t[4].isoformat() if t[4] else None, - "completed_at": t[5].isoformat() if t[5] else None, + "started_at": to_iso8601_utc(t[4]), + "completed_at": to_iso8601_utc(t[5]), "records_processed": t[6], "error_message": t[7], } @@ -100,8 +101,8 @@ async def get_task( "datasource_id": task[1], "datasource_name": task[2], "status": task[3], - "started_at": task[4].isoformat() if task[4] else None, - "completed_at": task[5].isoformat() if task[5] else None, + "started_at": to_iso8601_utc(task[4]), + "completed_at": to_iso8601_utc(task[5]), "records_processed": task[6], "error_message": task[7], } @@ -147,8 +148,8 @@ async def trigger_collection( "status": result.get("status", "unknown"), "records_processed": result.get("records_processed", 0), "error_message": result.get("error"), - "started_at": datetime.utcnow(), - "completed_at": datetime.utcnow(), + "started_at": datetime.now(UTC), + "completed_at": datetime.now(UTC), }, ) diff --git a/backend/app/api/v1/visualization.py b/backend/app/api/v1/visualization.py index 2d10ac7b..25de3c6f 100644 --- a/backend/app/api/v1/visualization.py +++ b/backend/app/api/v1/visualization.py @@ -4,7 +4,7 @@ Unified API for all visualization data sources. Returns GeoJSON format compatible with Three.js, CesiumJS, and Unreal Cesium. """ -from datetime import datetime +from datetime import UTC, datetime from fastapi import APIRouter, HTTPException, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func @@ -12,9 +12,12 @@ from typing import List, Dict, Any, Optional from app.core.collected_data_fields import get_record_field from app.core.satellite_tle import build_tle_lines_from_elements +from app.core.time import to_iso8601_utc from app.db.session import get_db +from app.models.bgp_anomaly import BGPAnomaly from app.models.collected_data import CollectedData from app.services.cable_graph import build_graph_from_data, CableGraph +from app.services.collectors.bgp_common import RIPE_RIS_COLLECTOR_COORDS router = APIRouter() @@ -273,6 +276,58 @@ def convert_gpu_cluster_to_geojson(records: List[CollectedData]) -> Dict[str, An return {"type": "FeatureCollection", "features": features} +def convert_bgp_anomalies_to_geojson(records: List[BGPAnomaly]) -> Dict[str, Any]: + features = [] + + for record in records: + evidence = record.evidence or {} + collectors = evidence.get("collectors") or record.peer_scope or [] + collector = collectors[0] if collectors else None + location = None + if collector: + location = RIPE_RIS_COLLECTOR_COORDS.get(str(collector)) + + if location is None: + nested = evidence.get("events") or [] + for item in nested: + collector_name = (item or {}).get("collector") + if collector_name and collector_name in RIPE_RIS_COLLECTOR_COORDS: + location = RIPE_RIS_COLLECTOR_COORDS[collector_name] + collector = collector_name + break + + if location is None: + continue + + features.append( + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [location["longitude"], location["latitude"]], + }, + "properties": { + "id": record.id, + "collector": collector, + "city": location.get("city"), + "country": location.get("country"), + "source": record.source, + "anomaly_type": record.anomaly_type, + "severity": record.severity, + "status": record.status, + "prefix": record.prefix, + "origin_asn": record.origin_asn, + "new_origin_asn": record.new_origin_asn, + "confidence": record.confidence, + "summary": record.summary, + "created_at": to_iso8601_utc(record.created_at), + }, + } + ) + + return {"type": "FeatureCollection", "features": features} + + # ============== API Endpoints ============== @@ -479,6 +534,25 @@ async def get_gpu_clusters_geojson( } +@router.get("/geo/bgp-anomalies") +async def get_bgp_anomalies_geojson( + severity: Optional[str] = Query(None), + status: Optional[str] = Query("active"), + limit: int = Query(200, ge=1, le=1000), + db: AsyncSession = Depends(get_db), +): + stmt = select(BGPAnomaly).order_by(BGPAnomaly.created_at.desc()).limit(limit) + if severity: + stmt = stmt.where(BGPAnomaly.severity == severity) + if status: + stmt = stmt.where(BGPAnomaly.status == status) + + result = await db.execute(stmt) + records = list(result.scalars().all()) + geojson = convert_bgp_anomalies_to_geojson(records) + return {**geojson, "count": len(geojson.get("features", []))} + + @router.get("/all") async def get_all_visualization_data(db: AsyncSession = Depends(get_db)): """获取所有可视化数据的统一端点 @@ -549,7 +623,7 @@ async def get_all_visualization_data(db: AsyncSession = Depends(get_db)): ) return { - "generated_at": datetime.utcnow().isoformat() + "Z", + "generated_at": to_iso8601_utc(datetime.now(UTC)), "version": "1.0", "data": { "satellites": satellites, diff --git a/backend/app/api/v1/websocket.py b/backend/app/api/v1/websocket.py index 85bac489..23ccb3fe 100644 --- a/backend/app/api/v1/websocket.py +++ b/backend/app/api/v1/websocket.py @@ -3,13 +3,14 @@ import asyncio import json import logging -from datetime import datetime +from datetime import UTC, datetime from typing import Optional from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query from jose import jwt, JWTError from app.core.config import settings +from app.core.time import to_iso8601_utc from app.core.websocket.manager import manager logger = logging.getLogger(__name__) @@ -59,6 +60,7 @@ async def websocket_endpoint( "ixp_nodes", "alerts", "dashboard", + "datasource_tasks", ], }, } @@ -72,7 +74,7 @@ async def websocket_endpoint( await websocket.send_json( { "type": "heartbeat", - "data": {"action": "pong", "timestamp": datetime.utcnow().isoformat()}, + "data": {"action": "pong", "timestamp": to_iso8601_utc(datetime.now(UTC))}, } ) elif data.get("type") == "subscribe": diff --git a/backend/app/core/data_sources.py b/backend/app/core/data_sources.py index 13f078a0..6828771e 100644 --- a/backend/app/core/data_sources.py +++ b/backend/app/core/data_sources.py @@ -23,6 +23,8 @@ COLLECTOR_URL_KEYS = { "top500": "top500.url", "epoch_ai_gpu": "epoch_ai.gpu_clusters_url", "spacetrack_tle": "spacetrack.tle_query_url", + "ris_live_bgp": "ris_live.url", + "bgpstream_bgp": "bgpstream.url", } diff --git a/backend/app/core/data_sources.yaml b/backend/app/core/data_sources.yaml index 7e97d335..9c64adb4 100644 --- a/backend/app/core/data_sources.yaml +++ b/backend/app/core/data_sources.yaml @@ -37,3 +37,9 @@ epoch_ai: spacetrack: base_url: "https://www.space-track.org" tle_query_url: "https://www.space-track.org/basicspacedata/query/class/gp/orderby/EPOCH%20desc/limit/1000/format/json" + +ris_live: + url: "https://ris-live.ripe.net/v1/stream/?format=json&client=planet-ris-live" + +bgpstream: + url: "https://broker.bgpstream.caida.org/v2" diff --git a/backend/app/core/datasource_defaults.py b/backend/app/core/datasource_defaults.py index 7c5f9430..cf0e374a 100644 --- a/backend/app/core/datasource_defaults.py +++ b/backend/app/core/datasource_defaults.py @@ -120,6 +120,20 @@ DEFAULT_DATASOURCES = { "priority": "P2", "frequency_minutes": 1440, }, + "ris_live_bgp": { + "id": 21, + "name": "RIPE RIS Live BGP", + "module": "L3", + "priority": "P1", + "frequency_minutes": 15, + }, + "bgpstream_bgp": { + "id": 22, + "name": "CAIDA BGPStream Backfill", + "module": "L3", + "priority": "P1", + "frequency_minutes": 360, + }, } ID_TO_COLLECTOR = {info["id"]: name for name, info in DEFAULT_DATASOURCES.items()} diff --git a/backend/app/core/security.py b/backend/app/core/security.py index e0cdfdbf..4b0bdcc5 100644 --- a/backend/app/core/security.py +++ b/backend/app/core/security.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Optional import bcrypt @@ -49,9 +49,9 @@ def get_password_hash(password: str) -> str: def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: - expire = datetime.utcnow() + expires_delta + expire = datetime.now(UTC) + expires_delta elif settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: - expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) + expire = datetime.now(UTC) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) else: expire = None if expire: @@ -65,7 +65,7 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) - def create_refresh_token(data: dict) -> str: to_encode = data.copy() if settings.REFRESH_TOKEN_EXPIRE_DAYS > 0: - expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) + expire = datetime.now(UTC) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire}) to_encode.update({"type": "refresh"}) if "sub" in to_encode: diff --git a/backend/app/core/time.py b/backend/app/core/time.py new file mode 100644 index 00000000..0e6a1303 --- /dev/null +++ b/backend/app/core/time.py @@ -0,0 +1,20 @@ +"""Time helpers for API serialization.""" + +from __future__ import annotations + +from datetime import UTC, datetime + + +def ensure_utc(value: datetime | None) -> datetime | None: + if value is None: + return None + if value.tzinfo is None: + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + + +def to_iso8601_utc(value: datetime | None) -> str | None: + normalized = ensure_utc(value) + if normalized is None: + return None + return normalized.isoformat().replace("+00:00", "Z") diff --git a/backend/app/core/websocket/broadcaster.py b/backend/app/core/websocket/broadcaster.py index d65fa214..0fbc2523 100644 --- a/backend/app/core/websocket/broadcaster.py +++ b/backend/app/core/websocket/broadcaster.py @@ -1,9 +1,10 @@ """Data broadcaster for WebSocket connections""" import asyncio -from datetime import datetime +from datetime import UTC, datetime from typing import Dict, Any, Optional +from app.core.time import to_iso8601_utc from app.core.websocket.manager import manager @@ -22,7 +23,7 @@ class DataBroadcaster: "active_datasources": 8, "tasks_today": 45, "success_rate": 97.8, - "last_updated": datetime.utcnow().isoformat(), + "last_updated": to_iso8601_utc(datetime.now(UTC)), "alerts": {"critical": 0, "warning": 2, "info": 5}, } @@ -35,7 +36,7 @@ class DataBroadcaster: { "type": "data_frame", "channel": "dashboard", - "timestamp": datetime.utcnow().isoformat(), + "timestamp": to_iso8601_utc(datetime.now(UTC)), "payload": {"stats": stats}, }, channel="dashboard", @@ -49,7 +50,7 @@ class DataBroadcaster: await manager.broadcast( { "type": "alert_notification", - "timestamp": datetime.utcnow().isoformat(), + "timestamp": to_iso8601_utc(datetime.now(UTC)), "data": {"alert": alert}, } ) @@ -60,7 +61,7 @@ class DataBroadcaster: { "type": "data_frame", "channel": "gpu_clusters", - "timestamp": datetime.utcnow().isoformat(), + "timestamp": to_iso8601_utc(datetime.now(UTC)), "payload": data, } ) @@ -71,12 +72,24 @@ class DataBroadcaster: { "type": "data_frame", "channel": channel, - "timestamp": datetime.utcnow().isoformat(), + "timestamp": to_iso8601_utc(datetime.now(UTC)), "payload": data, }, channel=channel if channel in manager.active_connections else "all", ) + async def broadcast_datasource_task_update(self, data: Dict[str, Any]): + """Broadcast datasource task progress updates to connected clients.""" + await manager.broadcast( + { + "type": "data_frame", + "channel": "datasource_tasks", + "timestamp": to_iso8601_utc(datetime.now(UTC)), + "payload": data, + }, + channel="all", + ) + def start(self): """Start all broadcasters""" if not self.running: diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 38e79102..0372e59c 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -5,6 +5,7 @@ from app.models.data_snapshot import DataSnapshot from app.models.datasource import DataSource from app.models.datasource_config import DataSourceConfig from app.models.alert import Alert, AlertSeverity, AlertStatus +from app.models.bgp_anomaly import BGPAnomaly from app.models.system_setting import SystemSetting __all__ = [ @@ -18,4 +19,5 @@ __all__ = [ "Alert", "AlertSeverity", "AlertStatus", -] + "BGPAnomaly", +] diff --git a/backend/app/models/alert.py b/backend/app/models/alert.py index ca5c7f1d..9c141e4c 100644 --- a/backend/app/models/alert.py +++ b/backend/app/models/alert.py @@ -5,6 +5,7 @@ from typing import Optional from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey, Enum as SQLEnum from sqlalchemy.orm import relationship +from app.core.time import to_iso8601_utc from app.db.session import Base @@ -50,8 +51,8 @@ class Alert(Base): "acknowledged_by": self.acknowledged_by, "resolved_by": self.resolved_by, "resolution_notes": self.resolution_notes, - "created_at": self.created_at.isoformat() if self.created_at else None, - "updated_at": self.updated_at.isoformat() if self.updated_at else None, - "acknowledged_at": self.acknowledged_at.isoformat() if self.acknowledged_at else None, - "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None, + "created_at": to_iso8601_utc(self.created_at), + "updated_at": to_iso8601_utc(self.updated_at), + "acknowledged_at": to_iso8601_utc(self.acknowledged_at), + "resolved_at": to_iso8601_utc(self.resolved_at), } diff --git a/backend/app/models/bgp_anomaly.py b/backend/app/models/bgp_anomaly.py new file mode 100644 index 00000000..013aa8fa --- /dev/null +++ b/backend/app/models/bgp_anomaly.py @@ -0,0 +1,58 @@ +"""BGP anomaly model for derived routing intelligence.""" + +from datetime import datetime + +from sqlalchemy import Column, DateTime, Float, ForeignKey, Index, Integer, JSON, String, Text + +from app.core.time import to_iso8601_utc +from app.db.session import Base + + +class BGPAnomaly(Base): + __tablename__ = "bgp_anomalies" + + id = Column(Integer, primary_key=True, index=True) + snapshot_id = Column(Integer, ForeignKey("data_snapshots.id"), nullable=True, index=True) + task_id = Column(Integer, ForeignKey("collection_tasks.id"), nullable=True, index=True) + source = Column(String(100), nullable=False, index=True) + anomaly_type = Column(String(50), nullable=False, index=True) + severity = Column(String(20), nullable=False, index=True) + status = Column(String(20), nullable=False, default="active", index=True) + entity_key = Column(String(255), nullable=False, index=True) + prefix = Column(String(64), nullable=True, index=True) + origin_asn = Column(Integer, nullable=True, index=True) + new_origin_asn = Column(Integer, nullable=True, index=True) + peer_scope = Column(JSON, default=list) + started_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True) + ended_at = Column(DateTime(timezone=True), nullable=True) + confidence = Column(Float, nullable=False, default=0.5) + summary = Column(Text, nullable=False) + evidence = Column(JSON, default=dict) + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True) + + __table_args__ = ( + Index("idx_bgp_anomalies_source_created", "source", "created_at"), + Index("idx_bgp_anomalies_type_status", "anomaly_type", "status"), + ) + + def to_dict(self) -> dict: + return { + "id": self.id, + "snapshot_id": self.snapshot_id, + "task_id": self.task_id, + "source": self.source, + "anomaly_type": self.anomaly_type, + "severity": self.severity, + "status": self.status, + "entity_key": self.entity_key, + "prefix": self.prefix, + "origin_asn": self.origin_asn, + "new_origin_asn": self.new_origin_asn, + "peer_scope": self.peer_scope or [], + "started_at": to_iso8601_utc(self.started_at), + "ended_at": to_iso8601_utc(self.ended_at), + "confidence": self.confidence, + "summary": self.summary, + "evidence": self.evidence or {}, + "created_at": to_iso8601_utc(self.created_at), + } diff --git a/backend/app/models/collected_data.py b/backend/app/models/collected_data.py index 84791f15..438db389 100644 --- a/backend/app/models/collected_data.py +++ b/backend/app/models/collected_data.py @@ -4,6 +4,7 @@ from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, T from sqlalchemy.sql import func from app.core.collected_data_fields import get_record_field +from app.core.time import to_iso8601_utc from app.db.session import Base @@ -74,15 +75,11 @@ class CollectedData(Base): "value": get_record_field(self, "value"), "unit": get_record_field(self, "unit"), "metadata": self.extra_data, - "collected_at": self.collected_at.isoformat() - if self.collected_at is not None - else None, - "reference_date": self.reference_date.isoformat() - if self.reference_date is not None - else None, + "collected_at": to_iso8601_utc(self.collected_at), + "reference_date": to_iso8601_utc(self.reference_date), "is_current": self.is_current, "previous_record_id": self.previous_record_id, "change_type": self.change_type, "change_summary": self.change_summary, - "deleted_at": self.deleted_at.isoformat() if self.deleted_at is not None else None, + "deleted_at": to_iso8601_utc(self.deleted_at), } diff --git a/backend/app/services/collectors/__init__.py b/backend/app/services/collectors/__init__.py index 69a2b11b..155f150d 100644 --- a/backend/app/services/collectors/__init__.py +++ b/backend/app/services/collectors/__init__.py @@ -30,6 +30,8 @@ from app.services.collectors.arcgis_landing import ArcGISLandingPointCollector from app.services.collectors.arcgis_relation import ArcGISCableLandingRelationCollector from app.services.collectors.spacetrack import SpaceTrackTLECollector from app.services.collectors.celestrak import CelesTrakTLECollector +from app.services.collectors.ris_live import RISLiveCollector +from app.services.collectors.bgpstream import BGPStreamBackfillCollector collector_registry.register(TOP500Collector()) collector_registry.register(EpochAIGPUCollector()) @@ -51,3 +53,5 @@ collector_registry.register(ArcGISLandingPointCollector()) collector_registry.register(ArcGISCableLandingRelationCollector()) collector_registry.register(SpaceTrackTLECollector()) collector_registry.register(CelesTrakTLECollector()) +collector_registry.register(RISLiveCollector()) +collector_registry.register(BGPStreamBackfillCollector()) diff --git a/backend/app/services/collectors/arcgis_cables.py b/backend/app/services/collectors/arcgis_cables.py index ac3db539..23e5c9a0 100644 --- a/backend/app/services/collectors/arcgis_cables.py +++ b/backend/app/services/collectors/arcgis_cables.py @@ -5,7 +5,7 @@ Collects submarine cable data from ArcGIS GeoJSON API. import json from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime import httpx from app.services.collectors.base import BaseCollector @@ -84,7 +84,7 @@ class ArcGISCableCollector(BaseCollector): "color": props.get("color"), "route_coordinates": route_coordinates, }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/arcgis_landing.py b/backend/app/services/collectors/arcgis_landing.py index 93976198..5aa9c294 100644 --- a/backend/app/services/collectors/arcgis_landing.py +++ b/backend/app/services/collectors/arcgis_landing.py @@ -1,5 +1,5 @@ from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime import httpx from app.services.collectors.base import BaseCollector @@ -67,7 +67,7 @@ class ArcGISLandingPointCollector(BaseCollector): "status": props.get("status"), "landing_point_id": props.get("landing_point_id"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/arcgis_relation.py b/backend/app/services/collectors/arcgis_relation.py index d06a46c8..9ec45688 100644 --- a/backend/app/services/collectors/arcgis_relation.py +++ b/backend/app/services/collectors/arcgis_relation.py @@ -1,5 +1,5 @@ import asyncio -from datetime import datetime +from datetime import UTC, datetime from typing import Any, Dict, List, Optional import httpx @@ -143,7 +143,7 @@ class ArcGISCableLandingRelationCollector(BaseCollector): "facility": facility, "status": status, }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/base.py b/backend/app/services/collectors/base.py index 0288dd61..64453974 100644 --- a/backend/app/services/collectors/base.py +++ b/backend/app/services/collectors/base.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional -from datetime import datetime +from datetime import UTC, datetime import httpx from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession @@ -10,6 +10,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.collected_data_fields import build_dynamic_metadata, get_record_field from app.core.config import settings from app.core.countries import normalize_country +from app.core.time import to_iso8601_utc +from app.core.websocket.broadcaster import broadcaster class BaseCollector(ABC): @@ -20,12 +22,14 @@ class BaseCollector(ABC): module: str = "L1" frequency_hours: int = 4 data_type: str = "generic" + fail_on_empty: bool = False def __init__(self): self._current_task = None self._db_session = None self._datasource_id = 1 self._resolved_url: Optional[str] = None + self._last_broadcast_progress: Optional[int] = None async def resolve_url(self, db: AsyncSession) -> None: from app.core.data_sources import get_data_sources_config @@ -33,18 +37,53 @@ class BaseCollector(ABC): config = get_data_sources_config() self._resolved_url = await config.get_url(self.name, db) - def update_progress(self, records_processed: int): + async def _publish_task_update(self, force: bool = False): + if not self._current_task: + return + + progress = float(self._current_task.progress or 0.0) + rounded_progress = int(round(progress)) + if not force and self._last_broadcast_progress == rounded_progress: + return + + await broadcaster.broadcast_datasource_task_update( + { + "datasource_id": getattr(self, "_datasource_id", None), + "collector_name": self.name, + "task_id": self._current_task.id, + "status": self._current_task.status, + "phase": self._current_task.phase, + "progress": progress, + "records_processed": self._current_task.records_processed, + "total_records": self._current_task.total_records, + "started_at": to_iso8601_utc(self._current_task.started_at), + "completed_at": to_iso8601_utc(self._current_task.completed_at), + "error_message": self._current_task.error_message, + } + ) + self._last_broadcast_progress = rounded_progress + + async def update_progress(self, records_processed: int, *, commit: bool = False, force: bool = False): """Update task progress - call this during data processing""" - if self._current_task and self._db_session and self._current_task.total_records > 0: + if self._current_task and self._db_session: self._current_task.records_processed = records_processed - self._current_task.progress = ( - records_processed / self._current_task.total_records - ) * 100 + if self._current_task.total_records and self._current_task.total_records > 0: + self._current_task.progress = ( + records_processed / self._current_task.total_records + ) * 100 + else: + self._current_task.progress = 0.0 + + if commit: + await self._db_session.commit() + + await self._publish_task_update(force=force) async def set_phase(self, phase: str): if self._current_task and self._db_session: self._current_task.phase = phase await self._db_session.commit() + await self._publish_task_update(force=True) @abstractmethod async def fetch(self) -> List[Dict[str, Any]]: @@ -133,7 +172,7 @@ class BaseCollector(ABC): from app.models.task import CollectionTask from app.models.data_snapshot import DataSnapshot - start_time = datetime.utcnow() + start_time = datetime.now(UTC) datasource_id = getattr(self, "_datasource_id", 1) snapshot_id: Optional[int] = None @@ -152,14 +191,20 @@ class BaseCollector(ABC): self._current_task = task self._db_session = db + self._last_broadcast_progress = None await self.resolve_url(db) + await self._publish_task_update(force=True) try: await self.set_phase("fetching") raw_data = await self.fetch() task.total_records = len(raw_data) await db.commit() + await self._publish_task_update(force=True) + + if self.fail_on_empty and not raw_data: + raise RuntimeError(f"Collector {self.name} returned no data") await self.set_phase("transforming") data = self.transform(raw_data) @@ -172,33 +217,35 @@ class BaseCollector(ABC): task.phase = "completed" task.records_processed = records_count task.progress = 100.0 - task.completed_at = datetime.utcnow() + task.completed_at = datetime.now(UTC) await db.commit() + await self._publish_task_update(force=True) return { "status": "success", "task_id": task_id, "records_processed": records_count, - "execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(), + "execution_time_seconds": (datetime.now(UTC) - start_time).total_seconds(), } except Exception as e: task.status = "failed" task.phase = "failed" task.error_message = str(e) - task.completed_at = datetime.utcnow() + task.completed_at = datetime.now(UTC) if snapshot_id is not None: snapshot = await db.get(DataSnapshot, snapshot_id) if snapshot: snapshot.status = "failed" - snapshot.completed_at = datetime.utcnow() + snapshot.completed_at = datetime.now(UTC) snapshot.summary = {"error": str(e)} await db.commit() + await self._publish_task_update(force=True) return { "status": "failed", "task_id": task_id, "error": str(e), - "execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(), + "execution_time_seconds": (datetime.now(UTC) - start_time).total_seconds(), } async def _save_data( @@ -219,11 +266,11 @@ class BaseCollector(ABC): snapshot.record_count = 0 snapshot.summary = {"created": 0, "updated": 0, "unchanged": 0} snapshot.status = "success" - snapshot.completed_at = datetime.utcnow() + snapshot.completed_at = datetime.now(UTC) await db.commit() return 0 - collected_at = datetime.utcnow() + collected_at = datetime.now(UTC) records_added = 0 created_count = 0 updated_count = 0 @@ -329,8 +376,7 @@ class BaseCollector(ABC): records_added += 1 if i % 100 == 0: - self.update_progress(i + 1) - await db.commit() + await self.update_progress(i + 1, commit=True) if snapshot_id is not None: deleted_keys = previous_current_keys - seen_entity_keys @@ -350,7 +396,7 @@ class BaseCollector(ABC): if snapshot: snapshot.record_count = records_added snapshot.status = "success" - snapshot.completed_at = datetime.utcnow() + snapshot.completed_at = datetime.now(UTC) snapshot.summary = { "created": created_count, "updated": updated_count, @@ -359,7 +405,7 @@ class BaseCollector(ABC): } await db.commit() - self.update_progress(len(data)) + await self.update_progress(len(data), force=True) return records_added async def save(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int: @@ -406,8 +452,8 @@ async def log_task( status=status, records_processed=records_processed, error_message=error_message, - started_at=datetime.utcnow(), - completed_at=datetime.utcnow(), + started_at=datetime.now(UTC), + completed_at=datetime.now(UTC), ) db.add(task) await db.commit() diff --git a/backend/app/services/collectors/bgp_common.py b/backend/app/services/collectors/bgp_common.py new file mode 100644 index 00000000..9195cac6 --- /dev/null +++ b/backend/app/services/collectors/bgp_common.py @@ -0,0 +1,313 @@ +"""Shared helpers for BGP collectors.""" + +from __future__ import annotations + +import hashlib +import ipaddress +from collections import Counter, defaultdict +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.bgp_anomaly import BGPAnomaly +from app.models.collected_data import CollectedData + + +RIPE_RIS_COLLECTOR_COORDS: dict[str, dict[str, Any]] = { + "rrc00": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041}, + "rrc01": {"city": "London", "country": "United Kingdom", "latitude": 51.5072, "longitude": -0.1276}, + "rrc03": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041}, + "rrc04": {"city": "Geneva", "country": "Switzerland", "latitude": 46.2044, "longitude": 6.1432}, + "rrc05": {"city": "Vienna", "country": "Austria", "latitude": 48.2082, "longitude": 16.3738}, + "rrc06": {"city": "Otemachi", "country": "Japan", "latitude": 35.686, "longitude": 139.7671}, + "rrc07": {"city": "Stockholm", "country": "Sweden", "latitude": 59.3293, "longitude": 18.0686}, + "rrc10": {"city": "Milan", "country": "Italy", "latitude": 45.4642, "longitude": 9.19}, + "rrc11": {"city": "New York", "country": "United States", "latitude": 40.7128, "longitude": -74.006}, + "rrc12": {"city": "Frankfurt", "country": "Germany", "latitude": 50.1109, "longitude": 8.6821}, + "rrc13": {"city": "Moscow", "country": "Russia", "latitude": 55.7558, "longitude": 37.6173}, + "rrc14": {"city": "Palo Alto", "country": "United States", "latitude": 37.4419, "longitude": -122.143}, + "rrc15": {"city": "Sao Paulo", "country": "Brazil", "latitude": -23.5558, "longitude": -46.6396}, + "rrc16": {"city": "Miami", "country": "United States", "latitude": 25.7617, "longitude": -80.1918}, + "rrc18": {"city": "Barcelona", "country": "Spain", "latitude": 41.3874, "longitude": 2.1686}, + "rrc19": {"city": "Johannesburg", "country": "South Africa", "latitude": -26.2041, "longitude": 28.0473}, + "rrc20": {"city": "Zurich", "country": "Switzerland", "latitude": 47.3769, "longitude": 8.5417}, + "rrc21": {"city": "Paris", "country": "France", "latitude": 48.8566, "longitude": 2.3522}, + "rrc22": {"city": "Bucharest", "country": "Romania", "latitude": 44.4268, "longitude": 26.1025}, + "rrc23": {"city": "Singapore", "country": "Singapore", "latitude": 1.3521, "longitude": 103.8198}, + "rrc24": {"city": "Montevideo", "country": "Uruguay", "latitude": -34.9011, "longitude": -56.1645}, + "rrc25": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041}, + "rrc26": {"city": "Dubai", "country": "United Arab Emirates", "latitude": 25.2048, "longitude": 55.2708}, +} + + +def _safe_int(value: Any) -> int | None: + try: + if value in (None, ""): + return None + return int(value) + except (TypeError, ValueError): + return None + + +def _parse_timestamp(value: Any) -> datetime: + if isinstance(value, datetime): + return value.astimezone(UTC) if value.tzinfo else value.replace(tzinfo=UTC) + + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value, tz=UTC) + + if isinstance(value, str) and value: + normalized = value.replace("Z", "+00:00") + parsed = datetime.fromisoformat(normalized) + return parsed.astimezone(UTC) if parsed.tzinfo else parsed.replace(tzinfo=UTC) + + return datetime.now(UTC) + + +def _normalize_as_path(raw_path: Any) -> list[int]: + if raw_path in (None, ""): + return [] + if isinstance(raw_path, list): + return [asn for asn in (_safe_int(item) for item in raw_path) if asn is not None] + if isinstance(raw_path, str): + parts = raw_path.replace("{", "").replace("}", "").split() + return [asn for asn in (_safe_int(item) for item in parts) if asn is not None] + return [] + + +def normalize_bgp_event(payload: dict[str, Any], *, project: str) -> dict[str, Any]: + raw_message = payload.get("raw_message", payload) + raw_path = ( + payload.get("path") + or payload.get("as_path") + or payload.get("attrs", {}).get("path") + or payload.get("attrs", {}).get("as_path") + or [] + ) + as_path = _normalize_as_path(raw_path) + + raw_type = str(payload.get("event_type") or payload.get("type") or payload.get("msg_type") or "").lower() + if raw_type in {"a", "announce", "announcement"}: + event_type = "announcement" + elif raw_type in {"w", "withdraw", "withdrawal"}: + event_type = "withdrawal" + elif raw_type in {"r", "rib"}: + event_type = "rib" + else: + event_type = raw_type or "announcement" + + prefix = str(payload.get("prefix") or payload.get("prefixes") or payload.get("target_prefix") or "").strip() + if prefix.startswith("[") and prefix.endswith("]"): + prefix = prefix[1:-1] + + timestamp = _parse_timestamp(payload.get("timestamp") or payload.get("time") or payload.get("ts")) + collector = str(payload.get("collector") or payload.get("host") or payload.get("router") or "unknown") + peer_asn = _safe_int(payload.get("peer_asn") or payload.get("peer")) + origin_asn = _safe_int(payload.get("origin_asn")) or (as_path[-1] if as_path else None) + source_material = "|".join( + [ + collector, + str(peer_asn or ""), + prefix, + event_type, + timestamp.isoformat(), + ",".join(str(asn) for asn in as_path), + ] + ) + source_id = hashlib.sha1(source_material.encode("utf-8")).hexdigest()[:24] + + prefix_length = None + is_more_specific = False + if prefix: + try: + network = ipaddress.ip_network(prefix, strict=False) + prefix_length = int(network.prefixlen) + is_more_specific = prefix_length > (24 if network.version == 4 else 48) + except ValueError: + prefix_length = None + + collector_location = RIPE_RIS_COLLECTOR_COORDS.get(collector, {}) + metadata = { + "project": project, + "collector": collector, + "peer_asn": peer_asn, + "peer_ip": payload.get("peer_ip") or payload.get("peer_address"), + "event_type": event_type, + "prefix": prefix, + "origin_asn": origin_asn, + "as_path": as_path, + "communities": payload.get("communities") or payload.get("attrs", {}).get("communities") or [], + "next_hop": payload.get("next_hop") or payload.get("attrs", {}).get("next_hop"), + "med": payload.get("med") or payload.get("attrs", {}).get("med"), + "local_pref": payload.get("local_pref") or payload.get("attrs", {}).get("local_pref"), + "timestamp": timestamp.isoformat(), + "as_path_length": len(as_path), + "prefix_length": prefix_length, + "is_more_specific": is_more_specific, + "visibility_weight": 1, + "collector_location": collector_location, + "raw_message": raw_message, + } + + return { + "source_id": source_id, + "name": prefix or f"{collector}:{event_type}", + "title": f"{event_type} {prefix}".strip(), + "description": f"{collector} observed {event_type} for {prefix}".strip(), + "reference_date": timestamp.isoformat(), + "country": collector_location.get("country"), + "city": collector_location.get("city"), + "latitude": collector_location.get("latitude"), + "longitude": collector_location.get("longitude"), + "metadata": metadata, + } + + +async def create_bgp_anomalies_for_batch( + db: AsyncSession, + *, + source: str, + snapshot_id: int | None, + task_id: int | None, + events: list[dict[str, Any]], +) -> int: + if not events: + return 0 + + pending_anomalies: list[BGPAnomaly] = [] + prefix_to_origins: defaultdict[str, set[int]] = defaultdict(set) + prefix_to_more_specifics: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) + withdrawal_counter: Counter[tuple[str, int | None]] = Counter() + + prefixes = {event["metadata"].get("prefix") for event in events if event.get("metadata", {}).get("prefix")} + previous_origin_map: dict[str, set[int]] = defaultdict(set) + + if prefixes: + previous_query = await db.execute( + select(CollectedData).where( + CollectedData.source == source, + CollectedData.snapshot_id != snapshot_id, + CollectedData.extra_data["prefix"].as_string().in_(sorted(prefixes)), + ) + ) + for record in previous_query.scalars().all(): + metadata = record.extra_data or {} + prefix = metadata.get("prefix") + origin = _safe_int(metadata.get("origin_asn")) + if prefix and origin is not None: + previous_origin_map[prefix].add(origin) + + for event in events: + metadata = event.get("metadata", {}) + prefix = metadata.get("prefix") + origin_asn = _safe_int(metadata.get("origin_asn")) + if not prefix: + continue + + if origin_asn is not None: + prefix_to_origins[prefix].add(origin_asn) + + if metadata.get("is_more_specific"): + prefix_to_more_specifics[prefix.split("/")[0]].append(event) + + if metadata.get("event_type") == "withdrawal": + withdrawal_counter[(prefix, origin_asn)] += 1 + + for prefix, origins in prefix_to_origins.items(): + historic = previous_origin_map.get(prefix, set()) + new_origins = sorted(origin for origin in origins if origin not in historic) + if historic and new_origins: + for new_origin in new_origins: + pending_anomalies.append( + BGPAnomaly( + snapshot_id=snapshot_id, + task_id=task_id, + source=source, + anomaly_type="origin_change", + severity="critical", + status="active", + entity_key=f"origin_change:{prefix}:{new_origin}", + prefix=prefix, + origin_asn=sorted(historic)[0], + new_origin_asn=new_origin, + peer_scope=[], + started_at=datetime.now(UTC), + confidence=0.86, + summary=f"Prefix {prefix} is now originated by AS{new_origin}, outside the current baseline.", + evidence={"previous_origins": sorted(historic), "current_origins": sorted(origins)}, + ) + ) + + for root_prefix, more_specifics in prefix_to_more_specifics.items(): + if len(more_specifics) >= 2: + sample = more_specifics[0]["metadata"] + pending_anomalies.append( + BGPAnomaly( + snapshot_id=snapshot_id, + task_id=task_id, + source=source, + anomaly_type="more_specific_burst", + severity="high", + status="active", + entity_key=f"more_specific_burst:{root_prefix}:{len(more_specifics)}", + prefix=sample.get("prefix"), + origin_asn=_safe_int(sample.get("origin_asn")), + new_origin_asn=None, + peer_scope=sorted( + { + str(item.get("metadata", {}).get("collector") or "") + for item in more_specifics + if item.get("metadata", {}).get("collector") + } + ), + started_at=datetime.now(UTC), + confidence=0.72, + summary=f"{len(more_specifics)} more-specific announcements clustered around {root_prefix}.", + evidence={"events": [item.get("metadata") for item in more_specifics[:10]]}, + ) + ) + + for (prefix, origin_asn), count in withdrawal_counter.items(): + if count >= 3: + pending_anomalies.append( + BGPAnomaly( + snapshot_id=snapshot_id, + task_id=task_id, + source=source, + anomaly_type="mass_withdrawal", + severity="high" if count < 8 else "critical", + status="active", + entity_key=f"mass_withdrawal:{prefix}:{origin_asn}:{count}", + prefix=prefix, + origin_asn=origin_asn, + new_origin_asn=None, + peer_scope=[], + started_at=datetime.now(UTC), + confidence=min(0.55 + (count * 0.05), 0.95), + summary=f"{count} withdrawal events observed for {prefix} in the current ingest window.", + evidence={"withdrawal_count": count}, + ) + ) + + if not pending_anomalies: + return 0 + + existing_result = await db.execute( + select(BGPAnomaly.entity_key).where( + BGPAnomaly.entity_key.in_([item.entity_key for item in pending_anomalies]) + ) + ) + existing_keys = {row[0] for row in existing_result.fetchall()} + + created = 0 + for anomaly in pending_anomalies: + if anomaly.entity_key in existing_keys: + continue + db.add(anomaly) + created += 1 + + if created: + await db.commit() + return created diff --git a/backend/app/services/collectors/bgpstream.py b/backend/app/services/collectors/bgpstream.py new file mode 100644 index 00000000..0ca4e436 --- /dev/null +++ b/backend/app/services/collectors/bgpstream.py @@ -0,0 +1,120 @@ +"""BGPStream backfill collector.""" + +from __future__ import annotations + +import asyncio +import json +import time +import urllib.parse +import urllib.request +from typing import Any + +from app.services.collectors.base import BaseCollector +from app.services.collectors.bgp_common import create_bgp_anomalies_for_batch, normalize_bgp_event + + +class BGPStreamBackfillCollector(BaseCollector): + name = "bgpstream_bgp" + priority = "P1" + module = "L3" + frequency_hours = 6 + data_type = "bgp_rib" + fail_on_empty = True + + async def fetch(self) -> list[dict[str, Any]]: + if not self._resolved_url: + raise RuntimeError("BGPStream URL is not configured") + + return await asyncio.to_thread(self._fetch_resource_windows) + + def _fetch_resource_windows(self) -> list[dict[str, Any]]: + end = int(time.time()) - 3600 + start = end - 86400 + params = [ + ("projects[]", "routeviews"), + ("collectors[]", "route-views2"), + ("types[]", "updates"), + ("intervals[]", f"{start},{end}"), + ] + url = f"{self._resolved_url}/data?{urllib.parse.urlencode(params)}" + request = urllib.request.Request( + url, + headers={"User-Agent": "Planet-Intelligence-System/1.0 (Python/collector)"}, + ) + with urllib.request.urlopen(request, timeout=30) as response: + body = json.loads(response.read().decode()) + + if body.get("error"): + raise RuntimeError(f"BGPStream broker error: {body['error']}") + + return body.get("data", {}).get("resources", []) + + def transform(self, raw_data: list[dict[str, Any]]) -> list[dict[str, Any]]: + transformed: list[dict[str, Any]] = [] + for item in raw_data: + if not isinstance(item, dict): + continue + + is_broker_window = any(key in item for key in ("filename", "url", "startTime", "start_time")) + + if {"collector", "prefix"} <= set(item.keys()) and not is_broker_window: + transformed.append(normalize_bgp_event(item, project="bgpstream")) + continue + + # Broker responses provide file windows rather than decoded events. + collector = item.get("collector") or item.get("project") or "bgpstream" + timestamp = item.get("time") or item.get("startTime") or item.get("start_time") + name = item.get("filename") or item.get("url") or f"{collector}-window" + normalized = normalize_bgp_event( + { + "collector": collector, + "event_type": "rib", + "prefix": item.get("prefix") or "historical-window", + "timestamp": timestamp, + "origin_asn": item.get("origin_asn"), + "path": item.get("path") or [], + "raw_message": item, + }, + project="bgpstream", + ) + transformed.append( + normalized + | { + "name": name, + "title": f"BGPStream {collector}", + "description": "Historical BGPStream backfill window", + "metadata": { + **normalized["metadata"], + "broker_record": item, + }, + } + ) + self._latest_transformed_batch = transformed + return transformed + + async def run(self, db): + result = await super().run(db) + if result.get("status") != "success": + return result + + snapshot_id = await self._resolve_snapshot_id(db, result.get("task_id")) + anomaly_count = await create_bgp_anomalies_for_batch( + db, + source=self.name, + snapshot_id=snapshot_id, + task_id=result.get("task_id"), + events=getattr(self, "_latest_transformed_batch", []), + ) + result["anomalies_created"] = anomaly_count + return result + + async def _resolve_snapshot_id(self, db, task_id: int | None) -> int | None: + if task_id is None: + return None + from sqlalchemy import select + from app.models.data_snapshot import DataSnapshot + + result = await db.execute( + select(DataSnapshot.id).where(DataSnapshot.task_id == task_id).order_by(DataSnapshot.id.desc()) + ) + return result.scalar_one_or_none() diff --git a/backend/app/services/collectors/cloudflare.py b/backend/app/services/collectors/cloudflare.py index b3061642..4eb24015 100644 --- a/backend/app/services/collectors/cloudflare.py +++ b/backend/app/services/collectors/cloudflare.py @@ -10,7 +10,7 @@ Some endpoints require authentication for higher rate limits. import asyncio import os from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime import httpx from app.services.collectors.base import HTTPCollector @@ -59,7 +59,7 @@ class CloudflareRadarDeviceCollector(HTTPCollector): "other_percent": float(summary.get("other", 0)), "date_range": result.get("meta", {}).get("dateRange", {}), }, - "reference_date": datetime.utcnow().isoformat(), + "reference_date": datetime.now(UTC).isoformat(), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -107,7 +107,7 @@ class CloudflareRadarTrafficCollector(HTTPCollector): "requests": item.get("requests"), "visit_duration": item.get("visitDuration"), }, - "reference_date": item.get("datetime", datetime.utcnow().isoformat()), + "reference_date": item.get("datetime", datetime.now(UTC).isoformat()), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -155,7 +155,7 @@ class CloudflareRadarTopASCollector(HTTPCollector): "traffic_share": item.get("trafficShare"), "country_code": item.get("location", {}).get("countryCode"), }, - "reference_date": datetime.utcnow().isoformat(), + "reference_date": datetime.now(UTC).isoformat(), } data.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/epoch_ai.py b/backend/app/services/collectors/epoch_ai.py index 8d22ca32..1fa813d6 100644 --- a/backend/app/services/collectors/epoch_ai.py +++ b/backend/app/services/collectors/epoch_ai.py @@ -6,7 +6,7 @@ https://epoch.ai/data/gpu-clusters import re from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime from bs4 import BeautifulSoup import httpx @@ -64,7 +64,7 @@ class EpochAIGPUCollector(BaseCollector): "metadata": { "raw_data": perf_cell, }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } data.append(entry) except (ValueError, IndexError, AttributeError): @@ -114,6 +114,6 @@ class EpochAIGPUCollector(BaseCollector): "metadata": { "note": "Sample data - Epoch AI page structure may vary", }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), }, ] diff --git a/backend/app/services/collectors/fao_landing.py b/backend/app/services/collectors/fao_landing.py index 5adba41a..f8c38570 100644 --- a/backend/app/services/collectors/fao_landing.py +++ b/backend/app/services/collectors/fao_landing.py @@ -4,7 +4,7 @@ Collects landing point data from FAO CSV API. """ from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime import httpx from app.services.collectors.base import BaseCollector @@ -58,7 +58,7 @@ class FAOLandingPointCollector(BaseCollector): "is_tbd": is_tbd, "original_id": feature_id, }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, IndexError): diff --git a/backend/app/services/collectors/huggingface.py b/backend/app/services/collectors/huggingface.py index 149752f3..d4c0af55 100644 --- a/backend/app/services/collectors/huggingface.py +++ b/backend/app/services/collectors/huggingface.py @@ -7,7 +7,7 @@ https://huggingface.co/spaces """ from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime from app.services.collectors.base import HTTPCollector @@ -46,7 +46,7 @@ class HuggingFaceModelCollector(HTTPCollector): "library_name": item.get("library_name"), "created_at": item.get("createdAt"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -87,7 +87,7 @@ class HuggingFaceDatasetCollector(HTTPCollector): "tags": (item.get("tags", []) or [])[:10], "created_at": item.get("createdAt"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -128,7 +128,7 @@ class HuggingFaceSpacesCollector(HTTPCollector): "tags": (item.get("tags", []) or [])[:10], "created_at": item.get("createdAt"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } data.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/peeringdb.py b/backend/app/services/collectors/peeringdb.py index 3c2b0179..5510095c 100644 --- a/backend/app/services/collectors/peeringdb.py +++ b/backend/app/services/collectors/peeringdb.py @@ -13,7 +13,7 @@ To get higher limits, set PEERINGDB_API_KEY environment variable. import asyncio import os from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime import httpx from app.services.collectors.base import HTTPCollector @@ -106,7 +106,7 @@ class PeeringDBIXPCollector(HTTPCollector): "created": item.get("created"), "updated": item.get("updated"), }, - "reference_date": datetime.utcnow().isoformat(), + "reference_date": datetime.now(UTC).isoformat(), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -209,7 +209,7 @@ class PeeringDBNetworkCollector(HTTPCollector): "created": item.get("created"), "updated": item.get("updated"), }, - "reference_date": datetime.utcnow().isoformat(), + "reference_date": datetime.now(UTC).isoformat(), } data.append(entry) except (ValueError, TypeError, KeyError): @@ -311,7 +311,7 @@ class PeeringDBFacilityCollector(HTTPCollector): "created": item.get("created"), "updated": item.get("updated"), }, - "reference_date": datetime.utcnow().isoformat(), + "reference_date": datetime.now(UTC).isoformat(), } data.append(entry) except (ValueError, TypeError, KeyError): diff --git a/backend/app/services/collectors/ris_live.py b/backend/app/services/collectors/ris_live.py new file mode 100644 index 00000000..f61e808b --- /dev/null +++ b/backend/app/services/collectors/ris_live.py @@ -0,0 +1,131 @@ +"""RIPE RIS Live collector.""" + +from __future__ import annotations + +import asyncio +import json +import urllib.request +from typing import Any + +from app.services.collectors.base import BaseCollector +from app.services.collectors.bgp_common import create_bgp_anomalies_for_batch, normalize_bgp_event + + +class RISLiveCollector(BaseCollector): + name = "ris_live_bgp" + priority = "P1" + module = "L3" + frequency_hours = 1 + data_type = "bgp_update" + fail_on_empty = True + max_messages = 100 + idle_timeout_seconds = 15 + + async def fetch(self) -> list[dict[str, Any]]: + if not self._resolved_url: + raise RuntimeError("RIS Live URL is not configured") + + return await asyncio.to_thread(self._fetch_via_stream) + + def _fetch_via_stream(self) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + stream_url = "https://ris-live.ripe.net/v1/stream/?format=json&client=planet-ris-live" + subscribe = json.dumps( + { + "host": "rrc00", + "type": "UPDATE", + "require": "announcements", + } + ) + request = urllib.request.Request( + stream_url, + headers={"X-RIS-Subscribe": subscribe}, + ) + with urllib.request.urlopen(request, timeout=20) as response: + while len(events) < self.max_messages: + line = response.readline().decode().strip() + if not line: + break + payload = json.loads(line) + if payload.get("type") != "ris_message": + continue + data = payload.get("data", {}) + if isinstance(data, dict): + events.append(data) + return events + + def transform(self, raw_data: list[dict[str, Any]]) -> list[dict[str, Any]]: + transformed: list[dict[str, Any]] = [] + for item in raw_data: + announcements = item.get("announcements") or [] + withdrawals = item.get("withdrawals") or [] + + for announcement in announcements: + next_hop = announcement.get("next_hop") + for prefix in announcement.get("prefixes") or []: + transformed.append( + normalize_bgp_event( + { + **item, + "collector": item.get("host", "").replace(".ripe.net", ""), + "event_type": "announcement", + "prefix": prefix, + "next_hop": next_hop, + }, + project="ris-live", + ) + ) + + for prefix in withdrawals: + transformed.append( + normalize_bgp_event( + { + **item, + "collector": item.get("host", "").replace(".ripe.net", ""), + "event_type": "withdrawal", + "prefix": prefix, + }, + project="ris-live", + ) + ) + + if not announcements and not withdrawals: + transformed.append( + normalize_bgp_event( + { + **item, + "collector": item.get("host", "").replace(".ripe.net", ""), + }, + project="ris-live", + ) + ) + + self._latest_transformed_batch = transformed + return transformed + + async def run(self, db): + result = await super().run(db) + if result.get("status") != "success": + return result + + snapshot_id = await self._resolve_snapshot_id(db, result.get("task_id")) + anomaly_count = await create_bgp_anomalies_for_batch( + db, + source=self.name, + snapshot_id=snapshot_id, + task_id=result.get("task_id"), + events=getattr(self, "_latest_transformed_batch", []), + ) + result["anomalies_created"] = anomaly_count + return result + + async def _resolve_snapshot_id(self, db, task_id: int | None) -> int | None: + if task_id is None: + return None + from sqlalchemy import select + from app.models.data_snapshot import DataSnapshot + + result = await db.execute( + select(DataSnapshot.id).where(DataSnapshot.task_id == task_id).order_by(DataSnapshot.id.desc()) + ) + return result.scalar_one_or_none() diff --git a/backend/app/services/collectors/telegeography.py b/backend/app/services/collectors/telegeography.py index f01188e9..b3bd7c72 100644 --- a/backend/app/services/collectors/telegeography.py +++ b/backend/app/services/collectors/telegeography.py @@ -7,7 +7,7 @@ Uses Wayback Machine as backup data source since live data requires JavaScript r import json import re from typing import Dict, Any, List -from datetime import datetime +from datetime import UTC, datetime from bs4 import BeautifulSoup import httpx @@ -103,7 +103,7 @@ class TeleGeographyCableCollector(BaseCollector): "capacity_tbps": item.get("capacity"), "url": item.get("url"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): @@ -131,7 +131,7 @@ class TeleGeographyCableCollector(BaseCollector): "owner": "Meta, Orange, Vodafone, etc.", "status": "active", }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), }, { "source_id": "telegeo_sample_2", @@ -147,7 +147,7 @@ class TeleGeographyCableCollector(BaseCollector): "owner": "Alibaba, NEC", "status": "planned", }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), }, ] @@ -187,7 +187,7 @@ class TeleGeographyLandingPointCollector(BaseCollector): "cable_count": len(item.get("cables", [])), "url": item.get("url"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): @@ -211,7 +211,7 @@ class TeleGeographyLandingPointCollector(BaseCollector): "value": "", "unit": "", "metadata": {"note": "Sample data"}, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), }, ] @@ -258,7 +258,7 @@ class TeleGeographyCableSystemCollector(BaseCollector): "investment": item.get("investment"), "url": item.get("url"), }, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): @@ -282,6 +282,6 @@ class TeleGeographyCableSystemCollector(BaseCollector): "value": "5000", "unit": "km", "metadata": {"note": "Sample data"}, - "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + "reference_date": datetime.now(UTC).strftime("%Y-%m-%d"), }, ] diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index ce43ea66..3e7f8623 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -2,7 +2,7 @@ import asyncio import logging -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Any, Dict, Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -10,6 +10,7 @@ from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import select from app.db.session import async_session_factory +from app.core.time import to_iso8601_utc from app.models.datasource import DataSource from app.models.task import CollectionTask from app.services.collectors.registry import collector_registry @@ -79,12 +80,12 @@ async def run_collector_task(collector_name: str): collector._datasource_id = datasource.id logger.info("Running collector: %s (datasource_id=%s)", collector_name, datasource.id) task_result = await collector.run(db) - datasource.last_run_at = datetime.utcnow() + datasource.last_run_at = datetime.now(UTC) datasource.last_status = task_result.get("status") await _update_next_run_at(datasource, db) logger.info("Collector %s completed: %s", collector_name, task_result) except Exception as exc: - datasource.last_run_at = datetime.utcnow() + datasource.last_run_at = datetime.now(UTC) datasource.last_status = "failed" await db.commit() logger.exception("Collector %s failed: %s", collector_name, exc) @@ -92,7 +93,7 @@ async def run_collector_task(collector_name: str): async def cleanup_stale_running_tasks(max_age_hours: int = 2) -> int: """Mark stale running tasks as failed after restarts or collector hangs.""" - cutoff = datetime.utcnow() - timedelta(hours=max_age_hours) + cutoff = datetime.now(UTC) - timedelta(hours=max_age_hours) async with async_session_factory() as db: result = await db.execute( @@ -107,7 +108,7 @@ async def cleanup_stale_running_tasks(max_age_hours: int = 2) -> int: for task in stale_tasks: task.status = "failed" task.phase = "failed" - task.completed_at = datetime.utcnow() + task.completed_at = datetime.now(UTC) existing_error = (task.error_message or "").strip() cleanup_error = "Marked failed automatically after stale running task cleanup" task.error_message = f"{existing_error}\n{cleanup_error}".strip() if existing_error else cleanup_error @@ -167,7 +168,7 @@ def get_scheduler_jobs() -> list[Dict[str, Any]]: { "id": job.id, "name": job.name, - "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, + "next_run_time": to_iso8601_utc(job.next_run_time), "trigger": str(job.trigger), } ) diff --git a/backend/tests/test_bgp.py b/backend/tests/test_bgp.py new file mode 100644 index 00000000..70d53112 --- /dev/null +++ b/backend/tests/test_bgp.py @@ -0,0 +1,74 @@ +"""Tests for BGP observability helpers.""" + +from app.models.bgp_anomaly import BGPAnomaly +from app.services.collectors.bgp_common import normalize_bgp_event +from app.services.collectors.bgpstream import BGPStreamBackfillCollector + + +def test_normalize_bgp_event_from_live_payload(): + event = normalize_bgp_event( + { + "collector": "rrc00", + "peer_asn": "3333", + "peer_ip": "2001:db8::1", + "type": "UPDATE", + "event_type": "announcement", + "prefix": "203.0.113.0/24", + "path": ["3333", "64500", "64496"], + "communities": ["3333:100"], + "timestamp": "2026-03-26T08:00:00Z", + }, + project="ris-live", + ) + + assert event["name"] == "203.0.113.0/24" + assert event["metadata"]["collector"] == "rrc00" + assert event["metadata"]["peer_asn"] == 3333 + assert event["metadata"]["origin_asn"] == 64496 + assert event["metadata"]["as_path_length"] == 3 + assert event["metadata"]["prefix_length"] == 24 + assert event["metadata"]["is_more_specific"] is False + + +def test_bgpstream_transform_preserves_broker_record(): + collector = BGPStreamBackfillCollector() + transformed = collector.transform( + [ + { + "project": "routeviews", + "collector": "route-views.sg", + "filename": "rib.20260326.0800.gz", + "startTime": "2026-03-26T08:00:00Z", + "prefix": "198.51.100.0/24", + "origin_asn": 64512, + } + ] + ) + + assert len(transformed) == 1 + record = transformed[0] + assert record["name"] == "rib.20260326.0800.gz" + assert record["metadata"]["project"] == "bgpstream" + assert record["metadata"]["broker_record"]["filename"] == "rib.20260326.0800.gz" + + +def test_bgp_anomaly_to_dict(): + anomaly = BGPAnomaly( + source="ris_live_bgp", + anomaly_type="origin_change", + severity="critical", + status="active", + entity_key="origin_change:203.0.113.0/24:64497", + prefix="203.0.113.0/24", + origin_asn=64496, + new_origin_asn=64497, + summary="Origin ASN changed", + confidence=0.9, + evidence={"previous_origins": [64496], "current_origins": [64497]}, + ) + + data = anomaly.to_dict() + assert data["source"] == "ris_live_bgp" + assert data["anomaly_type"] == "origin_change" + assert data["new_origin_asn"] == 64497 + assert data["evidence"]["previous_origins"] == [64496] diff --git a/docs/bgp-observability-plan.md b/docs/bgp-observability-plan.md new file mode 100644 index 00000000..450d0fe1 --- /dev/null +++ b/docs/bgp-observability-plan.md @@ -0,0 +1,487 @@ +# BGP Observability Plan + +## Goal + +Build a global routing observability capability on top of: + +- [RIPE RIS Live](https://ris-live.ripe.net/) +- [CAIDA BGPStream data access overview](https://bgpstream.caida.org/docs/overview/data-access) + +The target is to support: + +- real-time routing event ingestion +- historical replay and baseline analysis +- anomaly detection +- Earth big-screen visualization + +## Important Scope Note + +These data sources expose the BGP control plane, not user traffic itself. + +That means the system can infer: + +- route propagation direction +- prefix reachability changes +- AS path changes +- visibility changes across collectors + +But it cannot directly measure: + +- exact application traffic volume +- exact user packet path +- real bandwidth consumption between countries or operators + +Product wording should therefore use phrases like: + +- global routing propagation +- route visibility +- control-plane anomalies +- suspected path diversion + +Instead of claiming direct traffic measurement. + +## Data Source Roles + +### RIS Live + +Use RIS Live as the real-time feed. + +Recommended usage: + +- subscribe to update streams over WebSocket +- ingest announcements and withdrawals continuously +- trigger low-latency alerts + +Best suited for: + +- hijack suspicion +- withdrawal bursts +- real-time path changes +- live Earth event overlay + +### BGPStream + +Use BGPStream as the historical and replay layer. + +Recommended usage: + +- backfill time windows +- build normal baselines +- compare current events against history +- support investigations and playback + +Best suited for: + +- historical anomaly confirmation +- baseline path frequency +- visibility baselines +- postmortem analysis + +## Recommended Architecture + +```mermaid +flowchart LR + A["RIS Live WebSocket"] --> B["Realtime Collector"] + C["BGPStream Historical Access"] --> D["Backfill Collector"] + B --> E["Normalization Layer"] + D --> E + E --> F["data_snapshots"] + E --> G["collected_data"] + E --> H["bgp_anomalies"] + H --> I["Alerts API"] + G --> J["Visualization API"] + H --> J + J --> K["Earth Big Screen"] +``` + +## Storage Design + +The current project already has: + +- [data_snapshot.py](/home/ray/dev/linkong/planet/backend/app/models/data_snapshot.py) +- [collected_data.py](/home/ray/dev/linkong/planet/backend/app/models/collected_data.py) + +So the lowest-risk path is: + +1. keep raw and normalized BGP events in `collected_data` +2. use `data_snapshots` to group each ingest window +3. add a dedicated anomaly table for higher-value derived events + +## Proposed Data Types + +### `collected_data` + +Use these `source` values: + +- `ris_live_bgp` +- `bgpstream_bgp` + +Use these `data_type` values: + +- `bgp_update` +- `bgp_rib` +- `bgp_visibility` +- `bgp_path_change` + +Recommended stable fields: + +- `source` +- `source_id` +- `entity_key` +- `data_type` +- `name` +- `reference_date` +- `metadata` + +Recommended `entity_key` strategy: + +- event entity: `collector|peer|prefix|event_time` +- prefix state entity: `collector|peer|prefix` +- origin state entity: `prefix|origin_asn` + +### `metadata` schema for raw events + +Store the normalized event payload in `metadata`: + +```json +{ + "project": "ris-live", + "collector": "rrc00", + "peer_asn": 3333, + "peer_ip": "2001:db8::1", + "event_type": "announcement", + "prefix": "203.0.113.0/24", + "origin_asn": 64496, + "as_path": [3333, 64500, 64496], + "communities": ["3333:100", "64500:1"], + "next_hop": "192.0.2.1", + "med": 0, + "local_pref": null, + "timestamp": "2026-03-26T08:00:00Z", + "raw_message": {} +} +``` + +### New anomaly table + +Add a new table, recommended name: `bgp_anomalies` + +Suggested columns: + +- `id` +- `snapshot_id` +- `task_id` +- `source` +- `anomaly_type` +- `severity` +- `status` +- `entity_key` +- `prefix` +- `origin_asn` +- `new_origin_asn` +- `peer_scope` +- `started_at` +- `ended_at` +- `confidence` +- `summary` +- `evidence` +- `created_at` + +This table should represent derived intelligence, not raw updates. + +## Collector Design + +## 1. `RISLiveCollector` + +Responsibility: + +- maintain WebSocket connection +- subscribe to relevant message types +- normalize messages +- write event batches into snapshots +- optionally emit derived anomalies in near real time + +Suggested runtime mode: + +- long-running background task + +Suggested snapshot strategy: + +- one snapshot per rolling time window +- for example every 1 minute or every 5 minutes + +## 2. `BGPStreamBackfillCollector` + +Responsibility: + +- fetch historical data windows +- normalize to the same schema as real-time data +- build baselines +- re-run anomaly rules on past windows if needed + +Suggested runtime mode: + +- scheduled task +- or ad hoc task for investigations + +Suggested snapshot strategy: + +- one snapshot per historical query window + +## Normalization Rules + +Normalize both sources into the same internal event model. + +Required normalized fields: + +- `collector` +- `peer_asn` +- `peer_ip` +- `event_type` +- `prefix` +- `origin_asn` +- `as_path` +- `timestamp` + +Derived normalized fields: + +- `as_path_length` +- `country_guess` +- `prefix_length` +- `is_more_specific` +- `visibility_weight` + +## Anomaly Detection Rules + +Start with these five rules first. + +### 1. Origin ASN Change + +Trigger when: + +- the same prefix is announced by a new origin ASN not seen in the baseline window + +Use for: + +- hijack suspicion +- origin drift detection + +### 2. More-Specific Burst + +Trigger when: + +- a more-specific prefix appears suddenly +- especially from an unexpected origin ASN + +Use for: + +- subprefix hijack suspicion + +### 3. Mass Withdrawal + +Trigger when: + +- the same prefix or ASN sees many withdrawals across collectors within a short window + +Use for: + +- outage suspicion +- regional incident detection + +### 4. Path Deviation + +Trigger when: + +- AS path length jumps sharply +- or a rarely seen transit ASN appears +- or path frequency drops below baseline norms + +Use for: + +- route leak suspicion +- unusual path diversion + +### 5. Visibility Drop + +Trigger when: + +- a prefix is visible from far fewer collectors/peers than its baseline + +Use for: + +- regional reachability degradation + +## Baseline Strategy + +Use BGPStream historical data to build: + +- common origin ASN per prefix +- common AS path patterns +- collector visibility distribution +- normal withdrawal frequency + +Recommended baseline windows: + +- short baseline: last 24 hours +- medium baseline: last 7 days +- long baseline: last 30 days + +The first implementation can start with only the 7-day baseline. + +## API Design + +### Raw event API + +Add endpoints like: + +- `GET /api/v1/bgp/events` +- `GET /api/v1/bgp/events/{id}` + +Suggested filters: + +- `prefix` +- `origin_asn` +- `peer_asn` +- `collector` +- `event_type` +- `time_from` +- `time_to` +- `source` + +### Anomaly API + +Add endpoints like: + +- `GET /api/v1/bgp/anomalies` +- `GET /api/v1/bgp/anomalies/{id}` +- `GET /api/v1/bgp/anomalies/summary` + +Suggested filters: + +- `severity` +- `anomaly_type` +- `status` +- `prefix` +- `origin_asn` +- `time_from` +- `time_to` + +### Visualization API + +Add an Earth-oriented endpoint like: + +- `GET /api/v1/visualization/geo/bgp-anomalies` + +Recommended feature shapes: + +- point: collector locations +- arc: inferred propagation or suspicious path edge +- pulse point: active anomaly hotspot + +## Earth Big-Screen Design + +Recommended layers: + +### Layer 1: Collector layer + +Show known collector locations and current activity intensity. + +### Layer 2: Route propagation arcs + +Use arcs for: + +- origin ASN country to collector country +- or collector-to-collector visibility edges + +Important note: + +This is an inferred propagation view, not real packet flow. + +### Layer 3: Active anomaly overlay + +Show: + +- hijack suspicion in red +- mass withdrawal in orange +- visibility drop in yellow +- path deviation in blue + +### Layer 4: Time playback + +Use `data_snapshots` to replay: + +- minute-by-minute route changes +- anomaly expansion +- recovery timeline + +## Alerting Strategy + +Map anomaly severity to the current alert system. + +Recommended severity mapping: + +- `critical` + - likely hijack + - very large withdrawal burst +- `high` + - clear origin change + - large visibility drop +- `medium` + - unusual path change + - moderate more-specific burst +- `low` + - weak or localized anomalies + +## Delivery Plan + +### Phase 1 + +- add `RISLiveCollector` +- normalize updates into `collected_data` +- create `bgp_anomalies` +- implement 3 rules: + - origin change + - more-specific burst + - mass withdrawal + +### Phase 2 + +- add `BGPStreamBackfillCollector` +- build 7-day baseline +- implement: + - path deviation + - visibility drop + +### Phase 3 + +- add Earth visualization layer +- add time playback +- add anomaly filtering and drilldown + +## Practical Implementation Notes + +- Start with IPv4 first, then add IPv6 after the event schema is stable. +- Store the original raw payload in `metadata.raw_message` for traceability. +- Deduplicate events by a stable hash of collector, peer, prefix, type, and timestamp. +- Keep anomaly generation idempotent so replay and backfill do not create duplicate alerts. +- Expect noisy data and partial views; confidence scoring matters. + +## Recommended First Patch Set + +The first code milestone should include: + +1. `backend/app/services/collectors/ris_live.py` +2. `backend/app/services/collectors/bgpstream.py` +3. `backend/app/models/bgp_anomaly.py` +4. `backend/app/api/v1/bgp.py` +5. `backend/app/api/v1/visualization.py` + add BGP anomaly geo endpoint +6. `frontend/src/pages` + add a BGP anomaly list or summary page +7. `frontend/public/earth/js` + add BGP anomaly rendering layer + +## Sources + +- [RIPE RIS Live](https://ris-live.ripe.net/) +- [CAIDA BGPStream Data Access Overview](https://bgpstream.caida.org/docs/overview/data-access) diff --git a/frontend/package.json b/frontend/package.json index ad45e1c5..34fdc31d 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "planet-frontend", - "version": "0.21.3", + "version": "0.21.4-dev", "private": true, "dependencies": { "@ant-design/icons": "^5.2.6", diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index aeb7a8e2..8936e1f3 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -7,6 +7,7 @@ import DataSources from './pages/DataSources/DataSources' import DataList from './pages/DataList/DataList' import Earth from './pages/Earth/Earth' import Settings from './pages/Settings/Settings' +import BGP from './pages/BGP/BGP' function App() { const { token } = useAuthStore() @@ -24,6 +25,7 @@ function App() { } /> } /> } /> + } /> } /> } /> diff --git a/frontend/src/components/AppLayout/AppLayout.tsx b/frontend/src/components/AppLayout/AppLayout.tsx index e5ca1b44..2d04b860 100644 --- a/frontend/src/components/AppLayout/AppLayout.tsx +++ b/frontend/src/components/AppLayout/AppLayout.tsx @@ -6,6 +6,7 @@ import { UserOutlined, SettingOutlined, BarChartOutlined, + DeploymentUnitOutlined, MenuUnfoldOutlined, MenuFoldOutlined, } from '@ant-design/icons' @@ -31,6 +32,7 @@ function AppLayout({ children }: AppLayoutProps) { { key: '/', icon: , label: 仪表盘 }, { key: '/datasources', icon: , label: 数据源 }, { key: '/data', icon: , label: 采集数据 }, + { key: '/bgp', icon: , label: BGP观测 }, { key: '/users', icon: , label: 用户管理 }, { key: '/settings', icon: , label: 系统配置 }, ] diff --git a/frontend/src/index.css b/frontend/src/index.css index ae02ec64..9ecd85e0 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -239,12 +239,71 @@ body { gap: 12px; } +.data-source-builtin-tab { + gap: 12px; +} + .data-source-custom-toolbar { flex: 0 0 auto; display: flex; justify-content: flex-end; } +.data-source-bulk-toolbar { + flex: 0 0 auto; + display: flex; + align-items: center; + justify-content: space-between; + gap: 16px; + padding: 14px 16px; + border-radius: 14px; + background: linear-gradient(135deg, rgba(255, 255, 255, 0.96) 0%, rgba(245, 247, 250, 0.96) 100%); + border: 1px solid rgba(5, 5, 5, 0.08); + box-shadow: 0 10px 24px rgba(15, 23, 42, 0.06); +} + +.data-source-bulk-toolbar__meta { + flex: 1 1 auto; + min-width: 0; + display: flex; + flex-direction: column; + gap: 8px; +} + +.data-source-bulk-toolbar__title { + font-size: 15px; + font-weight: 600; + color: #1f1f1f; +} + +.data-source-bulk-toolbar__stats { + display: flex; + flex-wrap: wrap; + gap: 8px; +} + +.data-source-bulk-toolbar__progress { + display: flex; + flex-direction: column; + gap: 8px; + max-width: 520px; +} + +.data-source-bulk-toolbar__progress-copy { + display: flex; + align-items: baseline; + justify-content: space-between; + gap: 12px; + color: #595959; + font-size: 13px; +} + +.data-source-bulk-toolbar__progress-copy strong { + color: #1677ff; + font-size: 18px; + line-height: 1; +} + .data-source-table-region { flex: 1 1 auto; min-height: 0; diff --git a/frontend/src/pages/Alerts/Alerts.tsx b/frontend/src/pages/Alerts/Alerts.tsx index e3225336..eeb55004 100644 --- a/frontend/src/pages/Alerts/Alerts.tsx +++ b/frontend/src/pages/Alerts/Alerts.tsx @@ -3,6 +3,7 @@ import { Table, Tag, Card, Row, Col, Statistic, Button, Modal, Space, Descriptio import { AlertOutlined, InfoCircleOutlined, ReloadOutlined } from '@ant-design/icons' import { useAuthStore } from '../../stores/auth' import AppLayout from '../../components/AppLayout/AppLayout' +import { formatDateTimeZhCN } from '../../utils/datetime' interface Alert { id: number @@ -105,7 +106,7 @@ function Alerts() { title: '时间', dataIndex: 'created_at', key: 'created_at', - render: (t: string) => new Date(t).toLocaleString('zh-CN'), + render: (t: string) => formatDateTimeZhCN(t), }, { title: '操作', @@ -201,15 +202,15 @@ function Alerts() { {selectedAlert.datasource_name} {selectedAlert.message} - {new Date(selectedAlert.created_at).toLocaleString('zh-CN')} + {formatDateTimeZhCN(selectedAlert.created_at)} {selectedAlert.acknowledged_at && ( - {new Date(selectedAlert.acknowledged_at).toLocaleString('zh-CN')} + {formatDateTimeZhCN(selectedAlert.acknowledged_at)} )} {selectedAlert.resolved_at && ( - {new Date(selectedAlert.resolved_at).toLocaleString('zh-CN')} + {formatDateTimeZhCN(selectedAlert.resolved_at)} )} diff --git a/frontend/src/pages/BGP/BGP.tsx b/frontend/src/pages/BGP/BGP.tsx new file mode 100644 index 00000000..ca16d245 --- /dev/null +++ b/frontend/src/pages/BGP/BGP.tsx @@ -0,0 +1,159 @@ +import { useEffect, useState } from 'react' +import { Alert, Card, Col, Row, Space, Statistic, Table, Tag, Typography } from 'antd' +import axios from 'axios' +import AppLayout from '../../components/AppLayout/AppLayout' +import { formatDateTimeZhCN } from '../../utils/datetime' + +const { Title, Text } = Typography + +interface BGPAnomaly { + id: number + source: string + anomaly_type: string + severity: string + status: string + prefix: string | null + origin_asn: number | null + new_origin_asn: number | null + confidence: number + summary: string + created_at: string | null +} + +interface Summary { + total: number + by_type: Record + by_severity: Record + by_status: Record +} + +function severityColor(severity: string) { + if (severity === 'critical') return 'red' + if (severity === 'high') return 'orange' + if (severity === 'medium') return 'gold' + return 'blue' +} + +function BGP() { + const [loading, setLoading] = useState(false) + const [anomalies, setAnomalies] = useState([]) + const [summary, setSummary] = useState(null) + + useEffect(() => { + const load = async () => { + setLoading(true) + try { + const [anomaliesRes, summaryRes] = await Promise.all([ + axios.get('/api/v1/bgp/anomalies', { params: { page_size: 100 } }), + axios.get('/api/v1/bgp/anomalies/summary'), + ]) + setAnomalies(anomaliesRes.data.data || []) + setSummary(summaryRes.data) + } finally { + setLoading(false) + } + } + + load() + }, []) + + return ( + + +
+ BGP观测 + 查看实时与回放阶段归一化出的路由异常。 +
+ + + + + + + + + + + + + + + + + + + + + + + + rowKey="id" + loading={loading} + dataSource={anomalies} + pagination={{ pageSize: 10 }} + columns={[ + { + title: '时间', + dataIndex: 'created_at', + width: 180, + render: (value: string | null) => formatDateTimeZhCN(value), + }, + { + title: '类型', + dataIndex: 'anomaly_type', + width: 180, + }, + { + title: '严重度', + dataIndex: 'severity', + width: 120, + render: (value: string) => {value}, + }, + { + title: '前缀', + dataIndex: 'prefix', + width: 180, + render: (value: string | null) => value || '-', + }, + { + title: 'ASN', + key: 'asn', + width: 160, + render: (_, record) => { + if (record.origin_asn && record.new_origin_asn) { + return `AS${record.origin_asn} -> AS${record.new_origin_asn}` + } + if (record.origin_asn) { + return `AS${record.origin_asn}` + } + return '-' + }, + }, + { + title: '来源', + dataIndex: 'source', + width: 140, + }, + { + title: '置信度', + dataIndex: 'confidence', + width: 120, + render: (value: number) => `${Math.round((value || 0) * 100)}%`, + }, + { + title: '摘要', + dataIndex: 'summary', + }, + ]} + /> + +
+
+ ) +} + +export default BGP diff --git a/frontend/src/pages/Dashboard/Dashboard.tsx b/frontend/src/pages/Dashboard/Dashboard.tsx index 9202ddd4..f1709de1 100644 --- a/frontend/src/pages/Dashboard/Dashboard.tsx +++ b/frontend/src/pages/Dashboard/Dashboard.tsx @@ -10,6 +10,7 @@ import { } from '@ant-design/icons' import { useAuthStore } from '../../stores/auth' import AppLayout from '../../components/AppLayout/AppLayout' +import { formatDateTimeZhCN } from '../../utils/datetime' const { Title, Text } = Typography @@ -187,7 +188,7 @@ function Dashboard() { {stats?.last_updated && (
- 最后更新: {new Date(stats.last_updated).toLocaleString('zh-CN')} + 最后更新: {formatDateTimeZhCN(stats.last_updated)} {wsConnected && 实时同步中}
)} diff --git a/frontend/src/pages/DataList/DataList.tsx b/frontend/src/pages/DataList/DataList.tsx index 4f2f548c..5c380db2 100644 --- a/frontend/src/pages/DataList/DataList.tsx +++ b/frontend/src/pages/DataList/DataList.tsx @@ -11,6 +11,7 @@ import { } from '@ant-design/icons' import axios from 'axios' import AppLayout from '../../components/AppLayout/AppLayout' +import { formatDateTimeZhCN, formatDateZhCN, parseBackendDate } from '../../utils/datetime' const { Title, Text } = Typography const { useBreakpoint } = Grid @@ -18,6 +19,7 @@ const { useBreakpoint } = Grid interface CollectedData { id: number source: string + source_name: string source_id: string data_type: string name: string @@ -42,7 +44,12 @@ interface CollectedData { interface Summary { total_records: number by_source: Record> - source_totals: Array<{ source: string; count: number }> + source_totals: Array<{ source: string; source_name: string; count: number }> +} + +interface SourceOption { + source: string + source_name: string } const DETAIL_FIELD_LABELS: Record = { @@ -111,12 +118,15 @@ function formatDetailValue(key: string, value: unknown) { } if (key === 'collected_at' || key === 'reference_date') { - const date = new Date(String(value)) + const date = parseBackendDate(String(value)) + if (!date) { + return String(value) + } return Number.isNaN(date.getTime()) ? String(value) : key === 'reference_date' - ? date.toLocaleDateString('zh-CN') - : date.toLocaleString('zh-CN') + ? formatDateZhCN(String(value)) + : formatDateTimeZhCN(String(value)) } if (typeof value === 'boolean') { @@ -130,6 +140,13 @@ function formatDetailValue(key: string, value: unknown) { return String(value) } +function getDetailFieldValue(detailData: CollectedData, key: string): unknown { + if (key === 'source') { + return detailData.source_name || detailData.source + } + return detailData[key as keyof CollectedData] +} + function NameMarquee({ text }: { text: string }) { const containerRef = useRef(null) const textRef = useRef(null) @@ -249,7 +266,7 @@ function DataList() { const [sourceFilter, setSourceFilter] = useState([]) const [typeFilter, setTypeFilter] = useState([]) const [searchText, setSearchText] = useState('') - const [sources, setSources] = useState([]) + const [sources, setSources] = useState([]) const [types, setTypes] = useState([]) const [detailVisible, setDetailVisible] = useState(false) const [detailData, setDetailData] = useState(null) @@ -420,10 +437,42 @@ function DataList() { huggingface_models: 'purple', huggingface_datasets: 'cyan', huggingface_spaces: 'magenta', + peeringdb_ixp: 'gold', + peeringdb_network: 'orange', + peeringdb_facility: 'lime', telegeography_cables: 'green', + telegeography_landing: 'green', + telegeography_systems: 'emerald', + arcgis_cables: 'blue', + arcgis_landing_points: 'cyan', + arcgis_cable_landing_relations: 'volcano', + fao_landing_points: 'processing', epoch_ai_gpu: 'volcano', + ris_live_bgp: 'red', + bgpstream_bgp: 'purple', + cloudflare_radar_device: 'magenta', + cloudflare_radar_traffic: 'orange', + cloudflare_radar_top_as: 'gold', } - return colorMap[source] || 'blue' + + if (colorMap[source]) { + return colorMap[source] + } + + const fallbackPalette = [ + 'blue', + 'geekblue', + 'cyan', + 'green', + 'lime', + 'gold', + 'orange', + 'volcano', + 'magenta', + 'purple', + ] + const hash = Array.from(source).reduce((acc, char) => acc + char.charCodeAt(0), 0) + return fallbackPalette[hash % fallbackPalette.length] } const getDataTypeTagColor = (dataType: string) => { @@ -486,7 +535,7 @@ function DataList() { for (const item of (summary?.source_totals || []).slice(0, isCompact ? 3 : 5)) { items.push({ key: item.source, - label: item.source, + label: item.source_name, value: item.count, icon: getSourceIcon(item.source), }) @@ -564,7 +613,7 @@ function DataList() { return DETAIL_BASE_FIELDS.map((key) => ({ key, label: formatFieldLabel(key), - value: formatDetailValue(key, detailData[key as keyof CollectedData]), + value: formatDetailValue(key, getDetailFieldValue(detailData, key)), })).filter((item) => item.value !== '-') }, [detailData]) @@ -605,11 +654,11 @@ function DataList() { dataIndex: 'source', key: 'source', minWidth: 140, - render: (value: string) => ( - value ? ( + render: (_: string, record: CollectedData) => ( + record.source ? (
- - {value} + + {record.source_name || record.source}
) : '-' @@ -635,14 +684,14 @@ function DataList() { dataIndex: 'collected_at', key: 'collected_at', width: 180, - render: (time: string) => new Date(time).toLocaleString('zh-CN'), + render: (time: string) => formatDateTimeZhCN(time), }, { title: '参考日期', dataIndex: 'reference_date', key: 'reference_date', width: 120, - render: (time: string | null) => (time ? new Date(time).toLocaleDateString('zh-CN') : '-'), + render: (time: string | null) => formatDateZhCN(time), }, { title: '操作', @@ -756,7 +805,7 @@ function DataList() { setSourceFilter(value) setPage(1) }} - options={sources.map((source) => ({ label: source, value: source }))} + options={sources.map((source) => ({ label: source.source_name, value: source.source }))} tagRender={(tagProps) => renderFilterTag(tagProps, getSourceTagColor)} style={{ width: '100%' }} className="data-list-filter-select" diff --git a/frontend/src/pages/DataSources/DataSources.tsx b/frontend/src/pages/DataSources/DataSources.tsx index d8bb3bb3..c22a426e 100644 --- a/frontend/src/pages/DataSources/DataSources.tsx +++ b/frontend/src/pages/DataSources/DataSources.tsx @@ -1,7 +1,7 @@ -import { useEffect, useRef, useState } from 'react' +import { useCallback, useEffect, useRef, useState } from 'react' import { - Table, Tag, Space, message, Button, Form, Input, Select, - Drawer, Tabs, Empty, Tooltip, Popconfirm, Collapse, InputNumber + Table, Tag, Space, message, Button, Form, Input, Select, Progress, Checkbox, + Drawer, Tabs, Empty, Tooltip, Popconfirm, Collapse, InputNumber, Row, Col, Card } from 'antd' import { PlayCircleOutlined, PauseCircleOutlined, PlusOutlined, @@ -11,6 +11,8 @@ import { } from '@ant-design/icons' import axios from 'axios' import AppLayout from '../../components/AppLayout/AppLayout' +import { formatDateTimeZhCN } from '../../utils/datetime' +import { useWebSocket } from '../../hooks/useWebSocket' interface BuiltInDataSource { id: number @@ -22,6 +24,10 @@ interface BuiltInDataSource { is_active: boolean collector_class: string last_run: string | null + last_run_at?: string | null + last_status?: string | null + last_records_processed?: number | null + data_count?: number is_running: boolean task_id: number | null progress: number | null @@ -38,6 +44,22 @@ interface TaskTrackerState { status?: string | null records_processed?: number | null total_records?: number | null + error_message?: string | null +} + +interface WebSocketTaskMessage { + type: string + channel?: string + payload?: { + datasource_id?: number + task_id?: number | null + progress?: number | null + phase?: string | null + status?: string | null + records_processed?: number | null + total_records?: number | null + error_message?: string | null + } } interface CustomDataSource { @@ -78,6 +100,8 @@ function DataSources() { const [viewingSource, setViewingSource] = useState(null) const [recordCount, setRecordCount] = useState(0) const [testing, setTesting] = useState(false) + const [triggerAllLoading, setTriggerAllLoading] = useState(false) + const [forceTriggerAll, setForceTriggerAll] = useState(false) const [testResult, setTestResult] = useState(null) const builtinTableRegionRef = useRef(null) const customTableRegionRef = useRef(null) @@ -85,7 +109,7 @@ function DataSources() { const [customTableHeight, setCustomTableHeight] = useState(360) const [form] = Form.useForm() - const fetchData = async () => { + const fetchData = useCallback(async () => { setLoading(true) try { const [builtinRes, customRes] = await Promise.all([ @@ -99,13 +123,72 @@ function DataSources() { } finally { setLoading(false) } - } + }, []) const [taskProgress, setTaskProgress] = useState>({}) + const activeBuiltInCount = builtInSources.filter((source) => source.is_active).length + const runningBuiltInCount = builtInSources.filter((source) => { + const trackedTask = taskProgress[source.id] + return trackedTask?.is_running || source.is_running + }).length + const runningBuiltInSources = builtInSources.filter((source) => { + const trackedTask = taskProgress[source.id] + return trackedTask?.is_running || source.is_running + }) + const aggregateProgress = runningBuiltInSources.length > 0 + ? Math.round( + runningBuiltInSources.reduce((sum, source) => { + const trackedTask = taskProgress[source.id] + return sum + (trackedTask?.progress ?? source.progress ?? 0) + }, 0) / runningBuiltInSources.length + ) + : 0 + + const handleTaskSocketMessage = useCallback((message: WebSocketTaskMessage) => { + if (message.type !== 'data_frame' || message.channel !== 'datasource_tasks' || !message.payload?.datasource_id) { + return + } + + const payload = message.payload + const sourceId = payload.datasource_id + const nextState: TaskTrackerState = { + task_id: payload.task_id ?? null, + progress: payload.progress ?? 0, + is_running: payload.status === 'running', + phase: payload.phase ?? null, + status: payload.status ?? null, + records_processed: payload.records_processed ?? null, + total_records: payload.total_records ?? null, + error_message: payload.error_message ?? null, + } + + setTaskProgress((prev) => { + const next = { + ...prev, + [sourceId]: nextState, + } + + if (!nextState.is_running && nextState.status !== 'running') { + delete next[sourceId] + } + + return next + }) + + if (payload.status && payload.status !== 'running') { + void fetchData() + } + }, [fetchData]) + + const { connected: taskSocketConnected } = useWebSocket({ + autoConnect: true, + autoSubscribe: ['datasource_tasks'], + onMessage: handleTaskSocketMessage, + }) useEffect(() => { fetchData() - }, []) + }, [fetchData]) useEffect(() => { const updateHeights = () => { @@ -130,6 +213,8 @@ function DataSources() { }, [activeTab, builtInSources.length, customSources.length]) useEffect(() => { + if (taskSocketConnected) return + const trackedSources = builtInSources.filter((source) => { const trackedTask = taskProgress[source.id] return Boolean((trackedTask?.task_id ?? source.task_id) && (trackedTask?.is_running ?? source.is_running)) @@ -186,22 +271,28 @@ function DataSources() { }, 2000) return () => clearInterval(interval) - }, [builtInSources, taskProgress]) + }, [builtInSources, taskProgress, taskSocketConnected, fetchData]) const handleTrigger = async (id: number) => { try { const res = await axios.post(`/api/v1/datasources/${id}/trigger`) message.success('任务已触发') - setTaskProgress(prev => ({ - ...prev, - [id]: { - task_id: res.data.task_id ?? null, - progress: 0, - is_running: true, - phase: 'queued', - status: 'running', - }, - })) + if (res.data.task_id) { + setTaskProgress(prev => ({ + ...prev, + [id]: { + task_id: res.data.task_id, + progress: 0, + is_running: true, + phase: 'queued', + status: 'running', + }, + })) + } else { + window.setTimeout(() => { + fetchData() + }, 800) + } fetchData() } catch (error: unknown) { const err = error as { response?: { data?: { detail?: string } } } @@ -209,6 +300,52 @@ function DataSources() { } } + const handleTriggerAll = async () => { + try { + setTriggerAllLoading(true) + const res = await axios.post('/api/v1/datasources/trigger-all', null, { + params: { force: forceTriggerAll }, + }) + const triggered = res.data.triggered || [] + const skipped = res.data.skipped || [] + const failed = res.data.failed || [] + const skippedInWindow = skipped.filter((item: { reason?: string }) => item.reason === 'within_frequency_window') + const skippedOther = skipped.filter((item: { reason?: string }) => item.reason !== 'within_frequency_window') + + if (triggered.length > 0) { + setTaskProgress((prev) => { + const next = { ...prev } + for (const item of triggered) { + if (!item.task_id) continue + next[item.id] = { + task_id: item.task_id, + progress: 0, + is_running: true, + phase: 'queued', + status: 'running', + } + } + return next + }) + } + + const summaryParts = [ + `已触发 ${triggered.length} 个`, + skippedInWindow.length > 0 ? `周期内跳过 ${skippedInWindow.length} 个` : null, + skippedOther.length > 0 ? `其他跳过 ${skippedOther.length} 个` : null, + failed.length > 0 ? `失败 ${failed.length} 个` : null, + ].filter(Boolean) + + message.success(summaryParts.join(',')) + fetchData() + } catch (error: unknown) { + const err = error as { response?: { data?: { detail?: string } } } + message.error(err.response?.data?.detail || '全触发失败') + } finally { + setTriggerAllLoading(false) + } + } + const handleToggle = async (id: number, current: boolean) => { const endpoint = current ? 'disable' : 'enable' try { @@ -405,8 +542,15 @@ function DataSources() { title: '最近采集', dataIndex: 'last_run', key: 'last_run', - width: 140, - render: (lastRun: string | null) => lastRun || '-', + width: 180, + render: (_: string | null, record: BuiltInDataSource) => { + const label = formatDateTimeZhCN(record.last_run_at || record.last_run) + if (!label || label === '-') return '-' + if ((record.data_count || 0) === 0 && record.last_status === 'success') { + return `${label} (0条)` + } + return label + }, }, { title: '状态', @@ -431,7 +575,6 @@ function DataSources() { const phase = taskState?.phase || record.phase || 'queued' return ( - {record.is_active ? '运行中' : '已暂停'} {phaseLabelMap[phase] || phase} {pct > 0 ? ` ${Math.round(pct)}%` : ''} @@ -439,7 +582,26 @@ function DataSources() { ) } - return {record.is_active ? '运行中' : '已暂停'} + const lastStatusColor = + record.last_status === 'success' + ? 'success' + : record.last_status === 'failed' + ? 'error' + : 'default' + + return ( + + {record.last_status ? ( + + {record.last_status === 'success' + ? '采集成功' + : record.last_status === 'failed' + ? '采集失败' + : record.last_status} + + ) : null} + + ) }, }, { @@ -453,6 +615,7 @@ function DataSources() { type="link" size="small" icon={} + disabled={!record.is_active} onClick={() => handleTrigger(record.id)} > 触发 @@ -461,6 +624,8 @@ function DataSources() { type="link" size="small" icon={record.is_active ? : } + danger={record.is_active} + style={record.is_active ? undefined : { color: '#52c41a' }} onClick={() => handleToggle(record.id, record.is_active)} > {record.is_active ? '禁用' : '启用'} @@ -536,7 +701,47 @@ function DataSources() { key: 'builtin', label: '内置数据源', children: ( -
+
+
+
+
采集实时进度
+
+
+ 总体进度 + {aggregateProgress}% +
+ 0 ? 'active' : 'normal'} + showInfo={false} + strokeColor="#1677ff" + /> +
+
+ 内置 {builtInSources.length} + 已启用 {activeBuiltInCount} + 执行中 {runningBuiltInCount} +
+
+ + setForceTriggerAll(event.target.checked)} + > + 强制全部采集 + + + +
{viewingSource && ( - - - - + + + + +
名称
+ + + +
模块
+ + + +
优先级
+ + + +
频率
+ + + +
数据量
+ + + +
采集器
+ + + + - - - + + + + + +