feat: add bgp observability and admin ui improvements

This commit is contained in:
linkong
2026-03-27 14:27:07 +08:00
parent bf2c4a172d
commit b0058edf17
51 changed files with 2473 additions and 245 deletions

View File

@@ -30,6 +30,8 @@ from app.services.collectors.arcgis_landing import ArcGISLandingPointCollector
from app.services.collectors.arcgis_relation import ArcGISCableLandingRelationCollector
from app.services.collectors.spacetrack import SpaceTrackTLECollector
from app.services.collectors.celestrak import CelesTrakTLECollector
from app.services.collectors.ris_live import RISLiveCollector
from app.services.collectors.bgpstream import BGPStreamBackfillCollector
collector_registry.register(TOP500Collector())
collector_registry.register(EpochAIGPUCollector())
@@ -51,3 +53,5 @@ collector_registry.register(ArcGISLandingPointCollector())
collector_registry.register(ArcGISCableLandingRelationCollector())
collector_registry.register(SpaceTrackTLECollector())
collector_registry.register(CelesTrakTLECollector())
collector_registry.register(RISLiveCollector())
collector_registry.register(BGPStreamBackfillCollector())

View File

@@ -5,7 +5,7 @@ Collects submarine cable data from ArcGIS GeoJSON API.
import json
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
import httpx
from app.services.collectors.base import BaseCollector
@@ -84,7 +84,7 @@ class ArcGISCableCollector(BaseCollector):
"color": props.get("color"),
"route_coordinates": route_coordinates,
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -1,5 +1,5 @@
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
import httpx
from app.services.collectors.base import BaseCollector
@@ -67,7 +67,7 @@ class ArcGISLandingPointCollector(BaseCollector):
"status": props.get("status"),
"landing_point_id": props.get("landing_point_id"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -1,5 +1,5 @@
import asyncio
from datetime import datetime
from datetime import UTC, datetime
from typing import Any, Dict, List, Optional
import httpx
@@ -143,7 +143,7 @@ class ArcGISCableLandingRelationCollector(BaseCollector):
"facility": facility,
"status": status,
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from datetime import datetime
from datetime import UTC, datetime
import httpx
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -10,6 +10,8 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.core.collected_data_fields import build_dynamic_metadata, get_record_field
from app.core.config import settings
from app.core.countries import normalize_country
from app.core.time import to_iso8601_utc
from app.core.websocket.broadcaster import broadcaster
class BaseCollector(ABC):
@@ -20,12 +22,14 @@ class BaseCollector(ABC):
module: str = "L1"
frequency_hours: int = 4
data_type: str = "generic"
fail_on_empty: bool = False
def __init__(self):
self._current_task = None
self._db_session = None
self._datasource_id = 1
self._resolved_url: Optional[str] = None
self._last_broadcast_progress: Optional[int] = None
async def resolve_url(self, db: AsyncSession) -> None:
from app.core.data_sources import get_data_sources_config
@@ -33,18 +37,53 @@ class BaseCollector(ABC):
config = get_data_sources_config()
self._resolved_url = await config.get_url(self.name, db)
def update_progress(self, records_processed: int):
async def _publish_task_update(self, force: bool = False):
if not self._current_task:
return
progress = float(self._current_task.progress or 0.0)
rounded_progress = int(round(progress))
if not force and self._last_broadcast_progress == rounded_progress:
return
await broadcaster.broadcast_datasource_task_update(
{
"datasource_id": getattr(self, "_datasource_id", None),
"collector_name": self.name,
"task_id": self._current_task.id,
"status": self._current_task.status,
"phase": self._current_task.phase,
"progress": progress,
"records_processed": self._current_task.records_processed,
"total_records": self._current_task.total_records,
"started_at": to_iso8601_utc(self._current_task.started_at),
"completed_at": to_iso8601_utc(self._current_task.completed_at),
"error_message": self._current_task.error_message,
}
)
self._last_broadcast_progress = rounded_progress
async def update_progress(self, records_processed: int, *, commit: bool = False, force: bool = False):
"""Update task progress - call this during data processing"""
if self._current_task and self._db_session and self._current_task.total_records > 0:
if self._current_task and self._db_session:
self._current_task.records_processed = records_processed
self._current_task.progress = (
records_processed / self._current_task.total_records
) * 100
if self._current_task.total_records and self._current_task.total_records > 0:
self._current_task.progress = (
records_processed / self._current_task.total_records
) * 100
else:
self._current_task.progress = 0.0
if commit:
await self._db_session.commit()
await self._publish_task_update(force=force)
async def set_phase(self, phase: str):
if self._current_task and self._db_session:
self._current_task.phase = phase
await self._db_session.commit()
await self._publish_task_update(force=True)
@abstractmethod
async def fetch(self) -> List[Dict[str, Any]]:
@@ -133,7 +172,7 @@ class BaseCollector(ABC):
from app.models.task import CollectionTask
from app.models.data_snapshot import DataSnapshot
start_time = datetime.utcnow()
start_time = datetime.now(UTC)
datasource_id = getattr(self, "_datasource_id", 1)
snapshot_id: Optional[int] = None
@@ -152,14 +191,20 @@ class BaseCollector(ABC):
self._current_task = task
self._db_session = db
self._last_broadcast_progress = None
await self.resolve_url(db)
await self._publish_task_update(force=True)
try:
await self.set_phase("fetching")
raw_data = await self.fetch()
task.total_records = len(raw_data)
await db.commit()
await self._publish_task_update(force=True)
if self.fail_on_empty and not raw_data:
raise RuntimeError(f"Collector {self.name} returned no data")
await self.set_phase("transforming")
data = self.transform(raw_data)
@@ -172,33 +217,35 @@ class BaseCollector(ABC):
task.phase = "completed"
task.records_processed = records_count
task.progress = 100.0
task.completed_at = datetime.utcnow()
task.completed_at = datetime.now(UTC)
await db.commit()
await self._publish_task_update(force=True)
return {
"status": "success",
"task_id": task_id,
"records_processed": records_count,
"execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(),
"execution_time_seconds": (datetime.now(UTC) - start_time).total_seconds(),
}
except Exception as e:
task.status = "failed"
task.phase = "failed"
task.error_message = str(e)
task.completed_at = datetime.utcnow()
task.completed_at = datetime.now(UTC)
if snapshot_id is not None:
snapshot = await db.get(DataSnapshot, snapshot_id)
if snapshot:
snapshot.status = "failed"
snapshot.completed_at = datetime.utcnow()
snapshot.completed_at = datetime.now(UTC)
snapshot.summary = {"error": str(e)}
await db.commit()
await self._publish_task_update(force=True)
return {
"status": "failed",
"task_id": task_id,
"error": str(e),
"execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(),
"execution_time_seconds": (datetime.now(UTC) - start_time).total_seconds(),
}
async def _save_data(
@@ -219,11 +266,11 @@ class BaseCollector(ABC):
snapshot.record_count = 0
snapshot.summary = {"created": 0, "updated": 0, "unchanged": 0}
snapshot.status = "success"
snapshot.completed_at = datetime.utcnow()
snapshot.completed_at = datetime.now(UTC)
await db.commit()
return 0
collected_at = datetime.utcnow()
collected_at = datetime.now(UTC)
records_added = 0
created_count = 0
updated_count = 0
@@ -329,8 +376,7 @@ class BaseCollector(ABC):
records_added += 1
if i % 100 == 0:
self.update_progress(i + 1)
await db.commit()
await self.update_progress(i + 1, commit=True)
if snapshot_id is not None:
deleted_keys = previous_current_keys - seen_entity_keys
@@ -350,7 +396,7 @@ class BaseCollector(ABC):
if snapshot:
snapshot.record_count = records_added
snapshot.status = "success"
snapshot.completed_at = datetime.utcnow()
snapshot.completed_at = datetime.now(UTC)
snapshot.summary = {
"created": created_count,
"updated": updated_count,
@@ -359,7 +405,7 @@ class BaseCollector(ABC):
}
await db.commit()
self.update_progress(len(data))
await self.update_progress(len(data), force=True)
return records_added
async def save(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int:
@@ -406,8 +452,8 @@ async def log_task(
status=status,
records_processed=records_processed,
error_message=error_message,
started_at=datetime.utcnow(),
completed_at=datetime.utcnow(),
started_at=datetime.now(UTC),
completed_at=datetime.now(UTC),
)
db.add(task)
await db.commit()

View File

@@ -0,0 +1,313 @@
"""Shared helpers for BGP collectors."""
from __future__ import annotations
import hashlib
import ipaddress
from collections import Counter, defaultdict
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.bgp_anomaly import BGPAnomaly
from app.models.collected_data import CollectedData
RIPE_RIS_COLLECTOR_COORDS: dict[str, dict[str, Any]] = {
"rrc00": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041},
"rrc01": {"city": "London", "country": "United Kingdom", "latitude": 51.5072, "longitude": -0.1276},
"rrc03": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041},
"rrc04": {"city": "Geneva", "country": "Switzerland", "latitude": 46.2044, "longitude": 6.1432},
"rrc05": {"city": "Vienna", "country": "Austria", "latitude": 48.2082, "longitude": 16.3738},
"rrc06": {"city": "Otemachi", "country": "Japan", "latitude": 35.686, "longitude": 139.7671},
"rrc07": {"city": "Stockholm", "country": "Sweden", "latitude": 59.3293, "longitude": 18.0686},
"rrc10": {"city": "Milan", "country": "Italy", "latitude": 45.4642, "longitude": 9.19},
"rrc11": {"city": "New York", "country": "United States", "latitude": 40.7128, "longitude": -74.006},
"rrc12": {"city": "Frankfurt", "country": "Germany", "latitude": 50.1109, "longitude": 8.6821},
"rrc13": {"city": "Moscow", "country": "Russia", "latitude": 55.7558, "longitude": 37.6173},
"rrc14": {"city": "Palo Alto", "country": "United States", "latitude": 37.4419, "longitude": -122.143},
"rrc15": {"city": "Sao Paulo", "country": "Brazil", "latitude": -23.5558, "longitude": -46.6396},
"rrc16": {"city": "Miami", "country": "United States", "latitude": 25.7617, "longitude": -80.1918},
"rrc18": {"city": "Barcelona", "country": "Spain", "latitude": 41.3874, "longitude": 2.1686},
"rrc19": {"city": "Johannesburg", "country": "South Africa", "latitude": -26.2041, "longitude": 28.0473},
"rrc20": {"city": "Zurich", "country": "Switzerland", "latitude": 47.3769, "longitude": 8.5417},
"rrc21": {"city": "Paris", "country": "France", "latitude": 48.8566, "longitude": 2.3522},
"rrc22": {"city": "Bucharest", "country": "Romania", "latitude": 44.4268, "longitude": 26.1025},
"rrc23": {"city": "Singapore", "country": "Singapore", "latitude": 1.3521, "longitude": 103.8198},
"rrc24": {"city": "Montevideo", "country": "Uruguay", "latitude": -34.9011, "longitude": -56.1645},
"rrc25": {"city": "Amsterdam", "country": "Netherlands", "latitude": 52.3676, "longitude": 4.9041},
"rrc26": {"city": "Dubai", "country": "United Arab Emirates", "latitude": 25.2048, "longitude": 55.2708},
}
def _safe_int(value: Any) -> int | None:
try:
if value in (None, ""):
return None
return int(value)
except (TypeError, ValueError):
return None
def _parse_timestamp(value: Any) -> datetime:
if isinstance(value, datetime):
return value.astimezone(UTC) if value.tzinfo else value.replace(tzinfo=UTC)
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value, tz=UTC)
if isinstance(value, str) and value:
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
return parsed.astimezone(UTC) if parsed.tzinfo else parsed.replace(tzinfo=UTC)
return datetime.now(UTC)
def _normalize_as_path(raw_path: Any) -> list[int]:
if raw_path in (None, ""):
return []
if isinstance(raw_path, list):
return [asn for asn in (_safe_int(item) for item in raw_path) if asn is not None]
if isinstance(raw_path, str):
parts = raw_path.replace("{", "").replace("}", "").split()
return [asn for asn in (_safe_int(item) for item in parts) if asn is not None]
return []
def normalize_bgp_event(payload: dict[str, Any], *, project: str) -> dict[str, Any]:
raw_message = payload.get("raw_message", payload)
raw_path = (
payload.get("path")
or payload.get("as_path")
or payload.get("attrs", {}).get("path")
or payload.get("attrs", {}).get("as_path")
or []
)
as_path = _normalize_as_path(raw_path)
raw_type = str(payload.get("event_type") or payload.get("type") or payload.get("msg_type") or "").lower()
if raw_type in {"a", "announce", "announcement"}:
event_type = "announcement"
elif raw_type in {"w", "withdraw", "withdrawal"}:
event_type = "withdrawal"
elif raw_type in {"r", "rib"}:
event_type = "rib"
else:
event_type = raw_type or "announcement"
prefix = str(payload.get("prefix") or payload.get("prefixes") or payload.get("target_prefix") or "").strip()
if prefix.startswith("[") and prefix.endswith("]"):
prefix = prefix[1:-1]
timestamp = _parse_timestamp(payload.get("timestamp") or payload.get("time") or payload.get("ts"))
collector = str(payload.get("collector") or payload.get("host") or payload.get("router") or "unknown")
peer_asn = _safe_int(payload.get("peer_asn") or payload.get("peer"))
origin_asn = _safe_int(payload.get("origin_asn")) or (as_path[-1] if as_path else None)
source_material = "|".join(
[
collector,
str(peer_asn or ""),
prefix,
event_type,
timestamp.isoformat(),
",".join(str(asn) for asn in as_path),
]
)
source_id = hashlib.sha1(source_material.encode("utf-8")).hexdigest()[:24]
prefix_length = None
is_more_specific = False
if prefix:
try:
network = ipaddress.ip_network(prefix, strict=False)
prefix_length = int(network.prefixlen)
is_more_specific = prefix_length > (24 if network.version == 4 else 48)
except ValueError:
prefix_length = None
collector_location = RIPE_RIS_COLLECTOR_COORDS.get(collector, {})
metadata = {
"project": project,
"collector": collector,
"peer_asn": peer_asn,
"peer_ip": payload.get("peer_ip") or payload.get("peer_address"),
"event_type": event_type,
"prefix": prefix,
"origin_asn": origin_asn,
"as_path": as_path,
"communities": payload.get("communities") or payload.get("attrs", {}).get("communities") or [],
"next_hop": payload.get("next_hop") or payload.get("attrs", {}).get("next_hop"),
"med": payload.get("med") or payload.get("attrs", {}).get("med"),
"local_pref": payload.get("local_pref") or payload.get("attrs", {}).get("local_pref"),
"timestamp": timestamp.isoformat(),
"as_path_length": len(as_path),
"prefix_length": prefix_length,
"is_more_specific": is_more_specific,
"visibility_weight": 1,
"collector_location": collector_location,
"raw_message": raw_message,
}
return {
"source_id": source_id,
"name": prefix or f"{collector}:{event_type}",
"title": f"{event_type} {prefix}".strip(),
"description": f"{collector} observed {event_type} for {prefix}".strip(),
"reference_date": timestamp.isoformat(),
"country": collector_location.get("country"),
"city": collector_location.get("city"),
"latitude": collector_location.get("latitude"),
"longitude": collector_location.get("longitude"),
"metadata": metadata,
}
async def create_bgp_anomalies_for_batch(
db: AsyncSession,
*,
source: str,
snapshot_id: int | None,
task_id: int | None,
events: list[dict[str, Any]],
) -> int:
if not events:
return 0
pending_anomalies: list[BGPAnomaly] = []
prefix_to_origins: defaultdict[str, set[int]] = defaultdict(set)
prefix_to_more_specifics: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
withdrawal_counter: Counter[tuple[str, int | None]] = Counter()
prefixes = {event["metadata"].get("prefix") for event in events if event.get("metadata", {}).get("prefix")}
previous_origin_map: dict[str, set[int]] = defaultdict(set)
if prefixes:
previous_query = await db.execute(
select(CollectedData).where(
CollectedData.source == source,
CollectedData.snapshot_id != snapshot_id,
CollectedData.extra_data["prefix"].as_string().in_(sorted(prefixes)),
)
)
for record in previous_query.scalars().all():
metadata = record.extra_data or {}
prefix = metadata.get("prefix")
origin = _safe_int(metadata.get("origin_asn"))
if prefix and origin is not None:
previous_origin_map[prefix].add(origin)
for event in events:
metadata = event.get("metadata", {})
prefix = metadata.get("prefix")
origin_asn = _safe_int(metadata.get("origin_asn"))
if not prefix:
continue
if origin_asn is not None:
prefix_to_origins[prefix].add(origin_asn)
if metadata.get("is_more_specific"):
prefix_to_more_specifics[prefix.split("/")[0]].append(event)
if metadata.get("event_type") == "withdrawal":
withdrawal_counter[(prefix, origin_asn)] += 1
for prefix, origins in prefix_to_origins.items():
historic = previous_origin_map.get(prefix, set())
new_origins = sorted(origin for origin in origins if origin not in historic)
if historic and new_origins:
for new_origin in new_origins:
pending_anomalies.append(
BGPAnomaly(
snapshot_id=snapshot_id,
task_id=task_id,
source=source,
anomaly_type="origin_change",
severity="critical",
status="active",
entity_key=f"origin_change:{prefix}:{new_origin}",
prefix=prefix,
origin_asn=sorted(historic)[0],
new_origin_asn=new_origin,
peer_scope=[],
started_at=datetime.now(UTC),
confidence=0.86,
summary=f"Prefix {prefix} is now originated by AS{new_origin}, outside the current baseline.",
evidence={"previous_origins": sorted(historic), "current_origins": sorted(origins)},
)
)
for root_prefix, more_specifics in prefix_to_more_specifics.items():
if len(more_specifics) >= 2:
sample = more_specifics[0]["metadata"]
pending_anomalies.append(
BGPAnomaly(
snapshot_id=snapshot_id,
task_id=task_id,
source=source,
anomaly_type="more_specific_burst",
severity="high",
status="active",
entity_key=f"more_specific_burst:{root_prefix}:{len(more_specifics)}",
prefix=sample.get("prefix"),
origin_asn=_safe_int(sample.get("origin_asn")),
new_origin_asn=None,
peer_scope=sorted(
{
str(item.get("metadata", {}).get("collector") or "")
for item in more_specifics
if item.get("metadata", {}).get("collector")
}
),
started_at=datetime.now(UTC),
confidence=0.72,
summary=f"{len(more_specifics)} more-specific announcements clustered around {root_prefix}.",
evidence={"events": [item.get("metadata") for item in more_specifics[:10]]},
)
)
for (prefix, origin_asn), count in withdrawal_counter.items():
if count >= 3:
pending_anomalies.append(
BGPAnomaly(
snapshot_id=snapshot_id,
task_id=task_id,
source=source,
anomaly_type="mass_withdrawal",
severity="high" if count < 8 else "critical",
status="active",
entity_key=f"mass_withdrawal:{prefix}:{origin_asn}:{count}",
prefix=prefix,
origin_asn=origin_asn,
new_origin_asn=None,
peer_scope=[],
started_at=datetime.now(UTC),
confidence=min(0.55 + (count * 0.05), 0.95),
summary=f"{count} withdrawal events observed for {prefix} in the current ingest window.",
evidence={"withdrawal_count": count},
)
)
if not pending_anomalies:
return 0
existing_result = await db.execute(
select(BGPAnomaly.entity_key).where(
BGPAnomaly.entity_key.in_([item.entity_key for item in pending_anomalies])
)
)
existing_keys = {row[0] for row in existing_result.fetchall()}
created = 0
for anomaly in pending_anomalies:
if anomaly.entity_key in existing_keys:
continue
db.add(anomaly)
created += 1
if created:
await db.commit()
return created

View File

@@ -0,0 +1,120 @@
"""BGPStream backfill collector."""
from __future__ import annotations
import asyncio
import json
import time
import urllib.parse
import urllib.request
from typing import Any
from app.services.collectors.base import BaseCollector
from app.services.collectors.bgp_common import create_bgp_anomalies_for_batch, normalize_bgp_event
class BGPStreamBackfillCollector(BaseCollector):
name = "bgpstream_bgp"
priority = "P1"
module = "L3"
frequency_hours = 6
data_type = "bgp_rib"
fail_on_empty = True
async def fetch(self) -> list[dict[str, Any]]:
if not self._resolved_url:
raise RuntimeError("BGPStream URL is not configured")
return await asyncio.to_thread(self._fetch_resource_windows)
def _fetch_resource_windows(self) -> list[dict[str, Any]]:
end = int(time.time()) - 3600
start = end - 86400
params = [
("projects[]", "routeviews"),
("collectors[]", "route-views2"),
("types[]", "updates"),
("intervals[]", f"{start},{end}"),
]
url = f"{self._resolved_url}/data?{urllib.parse.urlencode(params)}"
request = urllib.request.Request(
url,
headers={"User-Agent": "Planet-Intelligence-System/1.0 (Python/collector)"},
)
with urllib.request.urlopen(request, timeout=30) as response:
body = json.loads(response.read().decode())
if body.get("error"):
raise RuntimeError(f"BGPStream broker error: {body['error']}")
return body.get("data", {}).get("resources", [])
def transform(self, raw_data: list[dict[str, Any]]) -> list[dict[str, Any]]:
transformed: list[dict[str, Any]] = []
for item in raw_data:
if not isinstance(item, dict):
continue
is_broker_window = any(key in item for key in ("filename", "url", "startTime", "start_time"))
if {"collector", "prefix"} <= set(item.keys()) and not is_broker_window:
transformed.append(normalize_bgp_event(item, project="bgpstream"))
continue
# Broker responses provide file windows rather than decoded events.
collector = item.get("collector") or item.get("project") or "bgpstream"
timestamp = item.get("time") or item.get("startTime") or item.get("start_time")
name = item.get("filename") or item.get("url") or f"{collector}-window"
normalized = normalize_bgp_event(
{
"collector": collector,
"event_type": "rib",
"prefix": item.get("prefix") or "historical-window",
"timestamp": timestamp,
"origin_asn": item.get("origin_asn"),
"path": item.get("path") or [],
"raw_message": item,
},
project="bgpstream",
)
transformed.append(
normalized
| {
"name": name,
"title": f"BGPStream {collector}",
"description": "Historical BGPStream backfill window",
"metadata": {
**normalized["metadata"],
"broker_record": item,
},
}
)
self._latest_transformed_batch = transformed
return transformed
async def run(self, db):
result = await super().run(db)
if result.get("status") != "success":
return result
snapshot_id = await self._resolve_snapshot_id(db, result.get("task_id"))
anomaly_count = await create_bgp_anomalies_for_batch(
db,
source=self.name,
snapshot_id=snapshot_id,
task_id=result.get("task_id"),
events=getattr(self, "_latest_transformed_batch", []),
)
result["anomalies_created"] = anomaly_count
return result
async def _resolve_snapshot_id(self, db, task_id: int | None) -> int | None:
if task_id is None:
return None
from sqlalchemy import select
from app.models.data_snapshot import DataSnapshot
result = await db.execute(
select(DataSnapshot.id).where(DataSnapshot.task_id == task_id).order_by(DataSnapshot.id.desc())
)
return result.scalar_one_or_none()

View File

@@ -10,7 +10,7 @@ Some endpoints require authentication for higher rate limits.
import asyncio
import os
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
import httpx
from app.services.collectors.base import HTTPCollector
@@ -59,7 +59,7 @@ class CloudflareRadarDeviceCollector(HTTPCollector):
"other_percent": float(summary.get("other", 0)),
"date_range": result.get("meta", {}).get("dateRange", {}),
},
"reference_date": datetime.utcnow().isoformat(),
"reference_date": datetime.now(UTC).isoformat(),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -107,7 +107,7 @@ class CloudflareRadarTrafficCollector(HTTPCollector):
"requests": item.get("requests"),
"visit_duration": item.get("visitDuration"),
},
"reference_date": item.get("datetime", datetime.utcnow().isoformat()),
"reference_date": item.get("datetime", datetime.now(UTC).isoformat()),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -155,7 +155,7 @@ class CloudflareRadarTopASCollector(HTTPCollector):
"traffic_share": item.get("trafficShare"),
"country_code": item.get("location", {}).get("countryCode"),
},
"reference_date": datetime.utcnow().isoformat(),
"reference_date": datetime.now(UTC).isoformat(),
}
data.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -6,7 +6,7 @@ https://epoch.ai/data/gpu-clusters
import re
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
from bs4 import BeautifulSoup
import httpx
@@ -64,7 +64,7 @@ class EpochAIGPUCollector(BaseCollector):
"metadata": {
"raw_data": perf_cell,
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
data.append(entry)
except (ValueError, IndexError, AttributeError):
@@ -114,6 +114,6 @@ class EpochAIGPUCollector(BaseCollector):
"metadata": {
"note": "Sample data - Epoch AI page structure may vary",
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
},
]

View File

@@ -4,7 +4,7 @@ Collects landing point data from FAO CSV API.
"""
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
import httpx
from app.services.collectors.base import BaseCollector
@@ -58,7 +58,7 @@ class FAOLandingPointCollector(BaseCollector):
"is_tbd": is_tbd,
"original_id": feature_id,
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, IndexError):

View File

@@ -7,7 +7,7 @@ https://huggingface.co/spaces
"""
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
from app.services.collectors.base import HTTPCollector
@@ -46,7 +46,7 @@ class HuggingFaceModelCollector(HTTPCollector):
"library_name": item.get("library_name"),
"created_at": item.get("createdAt"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -87,7 +87,7 @@ class HuggingFaceDatasetCollector(HTTPCollector):
"tags": (item.get("tags", []) or [])[:10],
"created_at": item.get("createdAt"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -128,7 +128,7 @@ class HuggingFaceSpacesCollector(HTTPCollector):
"tags": (item.get("tags", []) or [])[:10],
"created_at": item.get("createdAt"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
data.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -13,7 +13,7 @@ To get higher limits, set PEERINGDB_API_KEY environment variable.
import asyncio
import os
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
import httpx
from app.services.collectors.base import HTTPCollector
@@ -106,7 +106,7 @@ class PeeringDBIXPCollector(HTTPCollector):
"created": item.get("created"),
"updated": item.get("updated"),
},
"reference_date": datetime.utcnow().isoformat(),
"reference_date": datetime.now(UTC).isoformat(),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -209,7 +209,7 @@ class PeeringDBNetworkCollector(HTTPCollector):
"created": item.get("created"),
"updated": item.get("updated"),
},
"reference_date": datetime.utcnow().isoformat(),
"reference_date": datetime.now(UTC).isoformat(),
}
data.append(entry)
except (ValueError, TypeError, KeyError):
@@ -311,7 +311,7 @@ class PeeringDBFacilityCollector(HTTPCollector):
"created": item.get("created"),
"updated": item.get("updated"),
},
"reference_date": datetime.utcnow().isoformat(),
"reference_date": datetime.now(UTC).isoformat(),
}
data.append(entry)
except (ValueError, TypeError, KeyError):

View File

@@ -0,0 +1,131 @@
"""RIPE RIS Live collector."""
from __future__ import annotations
import asyncio
import json
import urllib.request
from typing import Any
from app.services.collectors.base import BaseCollector
from app.services.collectors.bgp_common import create_bgp_anomalies_for_batch, normalize_bgp_event
class RISLiveCollector(BaseCollector):
name = "ris_live_bgp"
priority = "P1"
module = "L3"
frequency_hours = 1
data_type = "bgp_update"
fail_on_empty = True
max_messages = 100
idle_timeout_seconds = 15
async def fetch(self) -> list[dict[str, Any]]:
if not self._resolved_url:
raise RuntimeError("RIS Live URL is not configured")
return await asyncio.to_thread(self._fetch_via_stream)
def _fetch_via_stream(self) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
stream_url = "https://ris-live.ripe.net/v1/stream/?format=json&client=planet-ris-live"
subscribe = json.dumps(
{
"host": "rrc00",
"type": "UPDATE",
"require": "announcements",
}
)
request = urllib.request.Request(
stream_url,
headers={"X-RIS-Subscribe": subscribe},
)
with urllib.request.urlopen(request, timeout=20) as response:
while len(events) < self.max_messages:
line = response.readline().decode().strip()
if not line:
break
payload = json.loads(line)
if payload.get("type") != "ris_message":
continue
data = payload.get("data", {})
if isinstance(data, dict):
events.append(data)
return events
def transform(self, raw_data: list[dict[str, Any]]) -> list[dict[str, Any]]:
transformed: list[dict[str, Any]] = []
for item in raw_data:
announcements = item.get("announcements") or []
withdrawals = item.get("withdrawals") or []
for announcement in announcements:
next_hop = announcement.get("next_hop")
for prefix in announcement.get("prefixes") or []:
transformed.append(
normalize_bgp_event(
{
**item,
"collector": item.get("host", "").replace(".ripe.net", ""),
"event_type": "announcement",
"prefix": prefix,
"next_hop": next_hop,
},
project="ris-live",
)
)
for prefix in withdrawals:
transformed.append(
normalize_bgp_event(
{
**item,
"collector": item.get("host", "").replace(".ripe.net", ""),
"event_type": "withdrawal",
"prefix": prefix,
},
project="ris-live",
)
)
if not announcements and not withdrawals:
transformed.append(
normalize_bgp_event(
{
**item,
"collector": item.get("host", "").replace(".ripe.net", ""),
},
project="ris-live",
)
)
self._latest_transformed_batch = transformed
return transformed
async def run(self, db):
result = await super().run(db)
if result.get("status") != "success":
return result
snapshot_id = await self._resolve_snapshot_id(db, result.get("task_id"))
anomaly_count = await create_bgp_anomalies_for_batch(
db,
source=self.name,
snapshot_id=snapshot_id,
task_id=result.get("task_id"),
events=getattr(self, "_latest_transformed_batch", []),
)
result["anomalies_created"] = anomaly_count
return result
async def _resolve_snapshot_id(self, db, task_id: int | None) -> int | None:
if task_id is None:
return None
from sqlalchemy import select
from app.models.data_snapshot import DataSnapshot
result = await db.execute(
select(DataSnapshot.id).where(DataSnapshot.task_id == task_id).order_by(DataSnapshot.id.desc())
)
return result.scalar_one_or_none()

View File

@@ -7,7 +7,7 @@ Uses Wayback Machine as backup data source since live data requires JavaScript r
import json
import re
from typing import Dict, Any, List
from datetime import datetime
from datetime import UTC, datetime
from bs4 import BeautifulSoup
import httpx
@@ -103,7 +103,7 @@ class TeleGeographyCableCollector(BaseCollector):
"capacity_tbps": item.get("capacity"),
"url": item.get("url"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):
@@ -131,7 +131,7 @@ class TeleGeographyCableCollector(BaseCollector):
"owner": "Meta, Orange, Vodafone, etc.",
"status": "active",
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
},
{
"source_id": "telegeo_sample_2",
@@ -147,7 +147,7 @@ class TeleGeographyCableCollector(BaseCollector):
"owner": "Alibaba, NEC",
"status": "planned",
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
},
]
@@ -187,7 +187,7 @@ class TeleGeographyLandingPointCollector(BaseCollector):
"cable_count": len(item.get("cables", [])),
"url": item.get("url"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):
@@ -211,7 +211,7 @@ class TeleGeographyLandingPointCollector(BaseCollector):
"value": "",
"unit": "",
"metadata": {"note": "Sample data"},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
},
]
@@ -258,7 +258,7 @@ class TeleGeographyCableSystemCollector(BaseCollector):
"investment": item.get("investment"),
"url": item.get("url"),
},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
}
result.append(entry)
except (ValueError, TypeError, KeyError):
@@ -282,6 +282,6 @@ class TeleGeographyCableSystemCollector(BaseCollector):
"value": "5000",
"unit": "km",
"metadata": {"note": "Sample data"},
"reference_date": datetime.utcnow().strftime("%Y-%m-%d"),
"reference_date": datetime.now(UTC).strftime("%Y-%m-%d"),
},
]