From aaae6a53c3fa221b28e1bd9f8bd83a6f9f5c4990 Mon Sep 17 00:00:00 2001 From: rayd1o Date: Wed, 11 Mar 2026 16:38:49 +0800 Subject: [PATCH] feat(backend): Add cable graph service and data collectors ## Changelog ### New Features #### Cable Graph Service - Add cable_graph.py for finding shortest path between landing points - Implement haversine distance calculation for great circle distances - Support for dateline crossing (longitude normalization) - NetworkX-based graph for optimal path finding #### Data Collectors - Add ArcGISCableCollector for fetching submarine cable data from ArcGIS GeoJSON API - Add FAOLandingPointCollector for fetching landing point data from FAO CSV API ### Backend Changes #### API Updates - auth.py: Update authentication logic - datasources.py: Add datasource endpoints and management - visualization.py: Add visualization API endpoints - config.py: Update configuration settings - security.py: Improve security settings #### Models & Schemas - task.py: Update task model with new fields - token.py: Update token schema #### Services - collectors/base.py: Improve base collector with better error handling - collectors/__init__.py: Register new collectors - scheduler.py: Update scheduler logic - tasks/scheduler.py: Add task scheduling ### Frontend Changes - AppLayout.tsx: Improve layout component - index.css: Add global styles - DataSources.tsx: Enhance data sources management page - vite.config.ts: Add Vite configuration for earth module --- backend/app/api/v1/auth.py | 12 +- backend/app/api/v1/datasources.py | 140 +++++++++- backend/app/api/v1/visualization.py | 257 ++++++++++++++---- backend/app/core/config.py | 4 +- backend/app/core/security.py | 14 +- backend/app/models/task.py | 4 +- backend/app/schemas/token.py | 4 +- backend/app/services/cable_graph.py | 239 ++++++++++++++++ backend/app/services/collectors/__init__.py | 4 + .../app/services/collectors/arcgis_cables.py | 84 ++++++ backend/app/services/collectors/base.py | 36 ++- .../app/services/collectors/fao_landing.py | 66 +++++ backend/app/services/scheduler.py | 2 + backend/app/tasks/scheduler.py | 28 +- .../src/components/AppLayout/AppLayout.tsx | 45 +-- frontend/src/index.css | 4 + .../src/pages/DataSources/DataSources.tsx | 183 +++++++++++-- frontend/vite.config.ts | 10 + 18 files changed, 990 insertions(+), 146 deletions(-) create mode 100644 backend/app/services/cable_graph.py create mode 100644 backend/app/services/collectors/arcgis_cables.py create mode 100644 backend/app/services/collectors/fao_landing.py diff --git a/backend/app/api/v1/auth.py b/backend/app/api/v1/auth.py index a8f98150..804b244c 100644 --- a/backend/app/api/v1/auth.py +++ b/backend/app/api/v1/auth.py @@ -61,10 +61,14 @@ async def login( access_token = create_access_token(data={"sub": user.id}) refresh_token = create_refresh_token(data={"sub": user.id}) + expires_in = None + if settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: + expires_in = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + return { "access_token": access_token, "token_type": "bearer", - "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, + "expires_in": expires_in, "user": { "id": user.id, "username": user.username, @@ -79,10 +83,14 @@ async def refresh_token( ): access_token = create_access_token(data={"sub": current_user.id}) + expires_in = None + if settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: + expires_in = settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + return { "access_token": access_token, "token_type": "bearer", - "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, + "expires_in": expires_in, "user": { "id": current_user.id, "username": current_user.username, diff --git a/backend/app/api/v1/datasources.py b/backend/app/api/v1/datasources.py index 8d4b65c1..b2ed6829 100644 --- a/backend/app/api/v1/datasources.py +++ b/backend/app/api/v1/datasources.py @@ -7,6 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db.session import get_db from app.models.user import User from app.models.datasource import DataSource +from app.models.task import CollectionTask +from app.models.collected_data import CollectedData from app.core.security import get_current_user from app.services.collectors.registry import collector_registry @@ -90,6 +92,20 @@ COLLECTOR_INFO = { "priority": "P2", "frequency_hours": 168, }, + "arcgis_cables": { + "id": 15, + "name": "ArcGIS Submarine Cables", + "module": "L2", + "priority": "P1", + "frequency_hours": 168, + }, + "fao_landing_points": { + "id": 16, + "name": "FAO Landing Points", + "module": "L2", + "priority": "P1", + "frequency_hours": 168, + }, } ID_TO_COLLECTOR = {info["id"]: name for name, info in COLLECTOR_INFO.items()} @@ -135,6 +151,35 @@ async def list_datasources( collector_list = [] for name, info in COLLECTOR_INFO.items(): is_active_status = collector_registry.is_active(name) + + running_task_query = ( + select(CollectionTask) + .where(CollectionTask.datasource_id == info["id"]) + .where(CollectionTask.status == "running") + .order_by(CollectionTask.started_at.desc()) + .limit(1) + ) + running_result = await db.execute(running_task_query) + running_task = running_result.scalar_one_or_none() + + last_run_query = ( + select(CollectionTask) + .where(CollectionTask.datasource_id == info["id"]) + .where(CollectionTask.completed_at.isnot(None)) + .order_by(CollectionTask.completed_at.desc()) + .limit(1) + ) + last_run_result = await db.execute(last_run_query) + last_task = last_run_result.scalar_one_or_none() + + data_count_query = select(func.count(CollectedData.id)).where(CollectedData.source == name) + data_count_result = await db.execute(data_count_query) + data_count = data_count_result.scalar() or 0 + + last_run = None + if last_task and last_task.completed_at and data_count > 0: + last_run = last_task.completed_at.strftime("%Y-%m-%d %H:%M") + collector_list.append( { "id": info["id"], @@ -144,6 +189,12 @@ async def list_datasources( "frequency": f"{info['frequency_hours']}h", "is_active": is_active_status, "collector_class": name, + "last_run": last_run, + "is_running": running_task is not None, + "task_id": running_task.id if running_task else None, + "progress": running_task.progress if running_task else None, + "records_processed": running_task.records_processed if running_task else None, + "total_records": running_task.total_records if running_task else None, } ) @@ -215,16 +266,22 @@ async def get_datasource_stats( raise HTTPException(status_code=404, detail="Data source not found") info = COLLECTOR_INFO[collector_name] - total_query = select(func.count(DataSource.id)).where(DataSource.source == info["name"]) - result = await db.execute(total_query) + source_name = info["name"] + + query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name) + result = await db.execute(query) total = result.scalar() or 0 + if total == 0: + query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name) + result = await db.execute(query) + total = result.scalar() or 0 + return { "source_id": source_id, "collector_name": collector_name, "name": info["name"], "total_records": total, - "last_updated": datetime.utcnow().isoformat(), } @@ -256,3 +313,80 @@ async def trigger_datasource( status_code=500, detail=f"Failed to trigger collector '{collector_name}'", ) + + +@router.delete("/{source_id}/data") +async def clear_datasource_data( + source_id: str, + current_user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + collector_name = get_collector_name(source_id) + if not collector_name: + raise HTTPException(status_code=404, detail="Data source not found") + + info = COLLECTOR_INFO[collector_name] + source_name = info["name"] + + query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name) + result = await db.execute(query) + count = result.scalar() or 0 + + if count == 0: + query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name) + result = await db.execute(query) + count = result.scalar() or 0 + delete_source = source_name + else: + delete_source = collector_name + + if count == 0: + return { + "status": "success", + "message": "No data to clear", + "deleted_count": 0, + } + + delete_query = CollectedData.__table__.delete().where(CollectedData.source == delete_source) + await db.execute(delete_query) + await db.commit() + + return { + "status": "success", + "message": f"Cleared {count} records for data source '{info['name']}'", + "deleted_count": count, + } + + +@router.get("/{source_id}/task-status") +async def get_task_status( + source_id: str, + db: AsyncSession = Depends(get_db), +): + collector_name = get_collector_name(source_id) + if not collector_name: + raise HTTPException(status_code=404, detail="Data source not found") + + info = COLLECTOR_INFO[collector_name] + + running_task_query = ( + select(CollectionTask) + .where(CollectionTask.datasource_id == info["id"]) + .where(CollectionTask.status == "running") + .order_by(CollectionTask.started_at.desc()) + .limit(1) + ) + running_result = await db.execute(running_task_query) + running_task = running_result.scalar_one_or_none() + + if not running_task: + return {"is_running": False, "task_id": None, "progress": None} + + return { + "is_running": True, + "task_id": running_task.id, + "progress": running_task.progress, + "records_processed": running_task.records_processed, + "total_records": running_task.total_records, + "status": running_task.status, + } diff --git a/backend/app/api/v1/visualization.py b/backend/app/api/v1/visualization.py index 5e77a2d8..f966b1d3 100644 --- a/backend/app/api/v1/visualization.py +++ b/backend/app/api/v1/visualization.py @@ -1,75 +1,189 @@ """Visualization API - GeoJSON endpoints for 3D Earth display""" -from fastapi import APIRouter, HTTPException -import httpx +from fastapi import APIRouter, HTTPException, Depends +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from typing import List, Dict, Any, Optional + +from app.db.session import get_db +from app.models.collected_data import CollectedData +from app.services.cable_graph import build_graph_from_data, CableGraph router = APIRouter() -CABLE_DATA_URL = "https://services.arcgis.com/6DIQcwlPy8knb6sg/arcgis/rest/services/SubmarineCables/FeatureServer/2/query?where=1%3D1&outFields=*&returnGeometry=true&f=geojson" -LANDING_POINT_CSV_URL = "https://data.apps.fao.org/catalog/dataset/1b75ff21-92f2-4b96-9b7b-98e8aa65ad5d/resource/b6071077-d1d4-4e97-aa00-42e902847c87/download/landing-point-geo.csv" + +def convert_cable_to_geojson(records: List[CollectedData]) -> Dict[str, Any]: + """Convert cable records to GeoJSON FeatureCollection""" + features = [] + + for record in records: + metadata = record.extra_data or {} + route_coords = metadata.get("route_coordinates", []) + + if not route_coords: + continue + + all_lines = [] + + # Handle both old format (flat array) and new format (array of arrays) + if route_coords and isinstance(route_coords[0], list): + # New format: array of arrays (MultiLineString structure) + if route_coords and isinstance(route_coords[0][0], list): + # Array of arrays of arrays - multiple lines + for line in route_coords: + line_coords = [] + for point in line: + if len(point) >= 2: + try: + lon = float(point[0]) + lat = float(point[1]) + line_coords.append([lon, lat]) + except (ValueError, TypeError): + continue + if len(line_coords) >= 2: + all_lines.append(line_coords) + else: + # Old format: flat array of points - treat as single line + line_coords = [] + for point in route_coords: + if len(point) >= 2: + try: + lon = float(point[0]) + lat = float(point[1]) + line_coords.append([lon, lat]) + except (ValueError, TypeError): + continue + if len(line_coords) >= 2: + all_lines.append(line_coords) + + if not all_lines: + continue + + # Use MultiLineString format to preserve cable segments + features.append( + { + "type": "Feature", + "geometry": {"type": "MultiLineString", "coordinates": all_lines}, + "properties": { + "id": record.id, + "source_id": record.source_id, + "Name": record.name, + "name": record.name, + "owner": metadata.get("owners"), + "owners": metadata.get("owners"), + "rfs": metadata.get("rfs"), + "RFS": metadata.get("rfs"), + "status": metadata.get("status", "active"), + "length": record.value, + "length_km": record.value, + "SHAPE__Length": record.value, + "url": metadata.get("url"), + "color": metadata.get("color"), + "year": metadata.get("year"), + }, + } + ) + + return {"type": "FeatureCollection", "features": features} + + +def convert_landing_point_to_geojson(records: List[CollectedData]) -> Dict[str, Any]: + """Convert landing point records to GeoJSON FeatureCollection""" + features = [] + + for record in records: + try: + lat = float(record.latitude) if record.latitude else None + lon = float(record.longitude) if record.longitude else None + except (ValueError, TypeError): + continue + + if lat is None or lon is None: + continue + + metadata = record.extra_data or {} + + features.append( + { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [lon, lat]}, + "properties": { + "id": record.id, + "source_id": record.source_id, + "name": record.name, + "country": record.country, + "city": record.city, + "is_tbd": metadata.get("is_tbd", False), + }, + } + ) + + return {"type": "FeatureCollection", "features": features} @router.get("/geo/cables") -async def get_cables_geojson(): +async def get_cables_geojson(db: AsyncSession = Depends(get_db)): """获取海底电缆 GeoJSON 数据 (LineString)""" try: - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.get(CABLE_DATA_URL) - response.raise_for_status() - return response.json() - except httpx.HTTPError as e: - raise HTTPException(status_code=502, detail=f"Failed to fetch cable data: {str(e)}") + stmt = select(CollectedData).where(CollectedData.source == "arcgis_cables") + result = await db.execute(stmt) + records = result.scalars().all() + + if not records: + raise HTTPException( + status_code=404, + detail="No cable data found. Please run the arcgis_cables collector first.", + ) + + return convert_cable_to_geojson(records) + except HTTPException: + raise except Exception as e: raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @router.get("/geo/landing-points") -async def get_landing_points_geojson(): +async def get_landing_points_geojson(db: AsyncSession = Depends(get_db)): """获取登陆点 GeoJSON 数据 (Point)""" try: - async with httpx.AsyncClient(timeout=60.0) as client: - response = await client.get(LANDING_POINT_CSV_URL) - response.raise_for_status() + stmt = select(CollectedData).where(CollectedData.source == "fao_landing_points") + result = await db.execute(stmt) + records = result.scalars().all() - lines = response.text.strip().split("\n") - if not lines: - raise HTTPException(status_code=500, detail="Empty CSV data") + if not records: + raise HTTPException( + status_code=404, + detail="No landing point data found. Please run the fao_landing_points collector first.", + ) - features = [] - for line in lines[1:]: - if not line.strip(): - continue - parts = line.split(",") - if len(parts) >= 4: - try: - lon = float(parts[0]) - lat = float(parts[1]) - feature_id = parts[2] - name = parts[3].strip('"') - is_tbd = parts[4].strip() == "true" if len(parts) > 4 else False - - features.append( - { - "type": "Feature", - "geometry": {"type": "Point", "coordinates": [lon, lat]}, - "properties": {"id": feature_id, "name": name, "is_tbd": is_tbd}, - } - ) - except (ValueError, IndexError): - continue - - return {"type": "FeatureCollection", "features": features} - except httpx.HTTPError as e: - raise HTTPException(status_code=502, detail=f"Failed to fetch landing point data: {str(e)}") + return convert_landing_point_to_geojson(records) + except HTTPException: + raise except Exception as e: raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @router.get("/geo/all") -async def get_all_geojson(): +async def get_all_geojson(db: AsyncSession = Depends(get_db)): """获取所有可视化数据 (电缆 + 登陆点)""" - cables = await get_cables_geojson() - points = await get_landing_points_geojson() + cables_stmt = select(CollectedData).where(CollectedData.source == "arcgis_cables") + cables_result = await db.execute(cables_stmt) + cables_records = cables_result.scalars().all() + + points_stmt = select(CollectedData).where(CollectedData.source == "fao_landing_points") + points_result = await db.execute(points_stmt) + points_records = points_result.scalars().all() + + cables = ( + convert_cable_to_geojson(cables_records) + if cables_records + else {"type": "FeatureCollection", "features": []} + ) + points = ( + convert_landing_point_to_geojson(points_records) + if points_records + else {"type": "FeatureCollection", "features": []} + ) return { "cables": cables, @@ -79,3 +193,52 @@ async def get_all_geojson(): "landing_point_count": len(points.get("features", [])) if points else 0, }, } + + +# Cache for cable graph +_cable_graph: Optional[CableGraph] = None + + +async def get_cable_graph(db: AsyncSession) -> CableGraph: + """Get or build cable graph (cached)""" + global _cable_graph + + if _cable_graph is None: + cables_stmt = select(CollectedData).where(CollectedData.source == "arcgis_cables") + cables_result = await db.execute(cables_stmt) + cables_records = list(cables_result.scalars().all()) + + points_stmt = select(CollectedData).where(CollectedData.source == "fao_landing_points") + points_result = await db.execute(points_stmt) + points_records = list(points_result.scalars().all()) + + cables_data = convert_cable_to_geojson(cables_records) + points_data = convert_landing_point_to_geojson(points_records) + + _cable_graph = build_graph_from_data(cables_data, points_data) + + return _cable_graph + + +@router.post("/geo/path") +async def find_path( + start: List[float], + end: List[float], + db: AsyncSession = Depends(get_db), +): + """Find shortest path between two coordinates via cable network""" + if not start or len(start) != 2: + raise HTTPException(status_code=400, detail="Start must be [lon, lat]") + if not end or len(end) != 2: + raise HTTPException(status_code=400, detail="End must be [lon, lat]") + + graph = await get_cable_graph(db) + result = graph.find_shortest_path(start, end) + + if not result: + raise HTTPException( + status_code=404, + detail="No path found between these points. They may be too far from any landing point.", + ) + + return result diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 207150e5..71c194a3 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -12,8 +12,8 @@ class Settings(BaseSettings): API_V1_STR: str = "/api/v1" SECRET_KEY: str = "your-secret-key-change-in-production" ALGORITHM: str = "HS256" - ACCESS_TOKEN_EXPIRE_MINUTES: int = 15 - REFRESH_TOKEN_EXPIRE_DAYS: int = 7 + ACCESS_TOKEN_EXPIRE_MINUTES: int = 0 + REFRESH_TOKEN_EXPIRE_DAYS: int = 0 POSTGRES_SERVER: str = "localhost" POSTGRES_USER: str = "postgres" diff --git a/backend/app/core/security.py b/backend/app/core/security.py index 89ec99a9..e0cdfdbf 100644 --- a/backend/app/core/security.py +++ b/backend/app/core/security.py @@ -50,9 +50,13 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) - to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta - else: + elif settings.ACCESS_TOKEN_EXPIRE_MINUTES > 0: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) - to_encode.update({"exp": expire, "type": "access"}) + else: + expire = None + if expire: + to_encode.update({"exp": expire}) + to_encode.update({"type": "access"}) if "sub" in to_encode: to_encode["sub"] = str(to_encode["sub"]) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) @@ -60,8 +64,10 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) - def create_refresh_token(data: dict) -> str: to_encode = data.copy() - expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) - to_encode.update({"exp": expire, "type": "refresh"}) + if settings.REFRESH_TOKEN_EXPIRE_DAYS > 0: + expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) + to_encode.update({"exp": expire}) + to_encode.update({"type": "refresh"}) if "sub" in to_encode: to_encode["sub"] = str(to_encode["sub"]) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) diff --git a/backend/app/models/task.py b/backend/app/models/task.py index ce46d78b..c5509027 100644 --- a/backend/app/models/task.py +++ b/backend/app/models/task.py @@ -1,6 +1,6 @@ """Collection Task model""" -from sqlalchemy import Column, DateTime, Integer, String, Text +from sqlalchemy import Column, DateTime, Integer, String, Text, Float from sqlalchemy.sql import func from app.db.session import Base @@ -15,6 +15,8 @@ class CollectionTask(Base): started_at = Column(DateTime(timezone=True)) completed_at = Column(DateTime(timezone=True)) records_processed = Column(Integer, default=0) + total_records = Column(Integer, default=0) # Total records to process + progress = Column(Float, default=0.0) # Progress percentage (0-100) error_message = Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) diff --git a/backend/app/schemas/token.py b/backend/app/schemas/token.py index 79b00da4..63120556 100644 --- a/backend/app/schemas/token.py +++ b/backend/app/schemas/token.py @@ -7,7 +7,7 @@ from pydantic import BaseModel class Token(BaseModel): access_token: str token_type: str = "bearer" - expires_in: int + expires_in: Optional[int] = None user: dict @@ -19,4 +19,4 @@ class TokenPayload(BaseModel): class TokenRefresh(BaseModel): access_token: str - expires_in: int + expires_in: Optional[int] = None diff --git a/backend/app/services/cable_graph.py b/backend/app/services/cable_graph.py new file mode 100644 index 00000000..216a538f --- /dev/null +++ b/backend/app/services/cable_graph.py @@ -0,0 +1,239 @@ +"""Cable graph service for finding shortest path between landing points""" + +import math +from typing import List, Dict, Any, Optional, Tuple +import networkx as nx + + +def normalize_longitude(lon: float) -> float: + """Normalize longitude to -180 to 180 range""" + while lon > 180: + lon -= 360 + while lon < -180: + lon += 360 + return lon + + +def haversine_distance(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> float: + """Calculate great circle distance between two points in km, handling dateline crossing""" + lon1, lat1 = normalize_longitude(coord1[0]), coord1[1] + lon2, lat2 = normalize_longitude(coord2[0]), coord2[1] + + R = 6371 + + lat1_rad = math.radians(lat1) + lat2_rad = math.radians(lat2) + delta_lat = math.radians(lat2 - lat1) + delta_lon = math.radians(lon2 - lon1) + + a = ( + math.sin(delta_lat / 2) ** 2 + + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2 + ) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + return R * c + + +class CableGraph: + def __init__(self, cables: List[Dict], landing_points: List[Dict]): + self.graph = nx.Graph() + self.landing_points = {lp["id"]: lp for lp in landing_points} + self.point_coords = {lp["id"]: (lp["lon"], lp["lat"]) for lp in landing_points} + self._build_graph(cables) + + def _build_graph(self, cables: List[Dict]): + """Build graph from cables and landing points""" + for cable in cables: + coords = cable.get("coordinates", []) + if len(coords) < 2: + continue + + # Find nearest landing points for start and end (search more points) + start_point = self._find_nearest_landing_point_multi(coords[:3]) # First 3 points + end_point = self._find_nearest_landing_point_multi(coords[-3:]) # Last 3 points + + if start_point and end_point and start_point != end_point: + # Calculate distance via cable route + distance = self._calculate_cable_distance(coords) + + # Add edge with cable info + edge_data = { + "distance": distance, + "cable_name": cable.get("name", "Unknown"), + "cable_id": cable.get("id"), + "coordinates": coords, + } + + # If edge exists, keep the shorter one + if self.graph.has_edge(start_point, end_point): + existing_dist = self.graph[start_point][end_point]["distance"] + if distance < existing_dist: + self.graph[start_point][end_point].update(edge_data) + else: + self.graph.add_edge(start_point, end_point, **edge_data) + + def _find_nearest_landing_point_multi(self, coords_subset: List[List[float]]) -> Optional[int]: + """Find nearest landing point from multiple coordinates (e.g., first/last N points)""" + best_point = None + best_dist = float("inf") + + for coord in coords_subset: + point = self._find_nearest_landing_point(coord) + if point: + dist = haversine_distance( + (normalize_longitude(coord[0]), coord[1]), self.point_coords[point] + ) + if dist < best_dist: + best_dist = dist + best_point = point + + return best_point + + def _find_nearest_landing_point(self, coord: List[float]) -> Optional[int]: + """Find nearest landing point to given coordinate""" + if not self.point_coords: + return None + + min_dist = float("inf") + nearest_id = None + + target_lon = normalize_longitude(coord[0]) + target_lat = coord[1] + + for lp_id, (lon, lat) in self.point_coords.items(): + dist = haversine_distance((target_lon, target_lat), (lon, lat)) + if dist < min_dist: + min_dist = dist + nearest_id = lp_id + + return nearest_id if min_dist < 500 else None + + def _find_nearest_connected_landing_point(self, coord: List[float]) -> Optional[int]: + """Find nearest landing point that's connected to the graph, handling dateline""" + if not self.point_coords or not self.graph.nodes(): + return None + + connected_nodes = set(self.graph.nodes()) + min_dist = float("inf") + nearest_id = None + + target_lon, target_lat = normalize_longitude(coord[0]), coord[1] + + for lp_id in connected_nodes: + lp_lon, lp_lat = self.point_coords[lp_id] + # Try both normalized versions (for points near dateline) + dist = haversine_distance((target_lon, target_lat), (lp_lon, lp_lat)) + if dist < min_dist: + min_dist = dist + nearest_id = lp_id + + return nearest_id if min_dist < 500 else None + + def _calculate_cable_distance(self, coordinates: List[List[float]]) -> float: + """Calculate total distance along cable route""" + total = 0 + for i in range(len(coordinates) - 1): + total += haversine_distance( + (coordinates[i][0], coordinates[i][1]), + (coordinates[i + 1][0], coordinates[i + 1][1]), + ) + return total + + def find_shortest_path( + self, start_coords: List[float], end_coords: List[float] + ) -> Optional[Dict[str, Any]]: + """Find shortest path between two coordinates""" + start_point = self._find_nearest_connected_landing_point(start_coords) + end_point = self._find_nearest_connected_landing_point(end_coords) + + if not start_point or not end_point: + return None + + if not nx.has_path(self.graph, start_point, end_point): + return None + + try: + path = nx.shortest_path(self.graph, start_point, end_point, weight="distance") + except nx.NetworkXNoPath: + return None + + if not nx.has_path(self.graph, start_point, end_point): + return None + + try: + path = nx.shortest_path(self.graph, start_point, end_point, weight="distance") + except nx.NetworkXNoPath: + return None + + # Build result + total_distance = 0 + path_segments = [] + + for i in range(len(path) - 1): + u, v = path[i], path[i + 1] + edge_data = self.graph[u][v] + total_distance += edge_data["distance"] + + path_segments.append( + { + "from": self.landing_points[u], + "to": self.landing_points[v], + "cable_name": edge_data["cable_name"], + "cable_id": edge_data["cable_id"], + "distance_km": round(edge_data["distance"], 2), + "coordinates": edge_data["coordinates"], + } + ) + + return { + "start": { + "id": start_point, + "name": self.landing_points[start_point].get("name", "Unknown"), + "coords": list(self.point_coords[start_point]), + }, + "end": { + "id": end_point, + "name": self.landing_points[end_point].get("name", "Unknown"), + "coords": list(self.point_coords[end_point]), + }, + "total_distance_km": round(total_distance, 2), + "segments": path_segments, + "segment_count": len(path_segments), + } + + +def build_graph_from_data(cables_data: Dict, points_data: Dict) -> CableGraph: + """Build cable graph from GeoJSON data""" + cables = [] + for feature in cables_data.get("features", []): + props = feature.get("properties", {}) + coords = feature.get("geometry", {}).get("coordinates", []) + if coords and isinstance(coords[0], list): + coords = coords[0] # MultiLineString - take first line + + cables.append( + { + "id": props.get("id"), + "name": props.get("name", props.get("Name", "Unknown")), + "coordinates": coords, + } + ) + + points = [] + for feature in points_data.get("features", []): + geom = feature.get("geometry", {}) + props = feature.get("properties", {}) + coords = geom.get("coordinates", []) + + if coords and len(coords) >= 2: + points.append( + { + "id": props.get("id"), + "name": props.get("name", "Unknown"), + "lon": coords[0], + "lat": coords[1], + } + ) + + return CableGraph(cables, points) diff --git a/backend/app/services/collectors/__init__.py b/backend/app/services/collectors/__init__.py index 334697db..36456bd7 100644 --- a/backend/app/services/collectors/__init__.py +++ b/backend/app/services/collectors/__init__.py @@ -24,6 +24,8 @@ from app.services.collectors.cloudflare import ( CloudflareRadarTrafficCollector, CloudflareRadarTopASCollector, ) +from app.services.collectors.arcgis_cables import ArcGISCableCollector +from app.services.collectors.fao_landing import FAOLandingPointCollector collector_registry.register(TOP500Collector()) collector_registry.register(EpochAIGPUCollector()) @@ -39,3 +41,5 @@ collector_registry.register(TeleGeographyCableSystemCollector()) collector_registry.register(CloudflareRadarDeviceCollector()) collector_registry.register(CloudflareRadarTrafficCollector()) collector_registry.register(CloudflareRadarTopASCollector()) +collector_registry.register(ArcGISCableCollector()) +collector_registry.register(FAOLandingPointCollector()) diff --git a/backend/app/services/collectors/arcgis_cables.py b/backend/app/services/collectors/arcgis_cables.py new file mode 100644 index 00000000..7fea2e9a --- /dev/null +++ b/backend/app/services/collectors/arcgis_cables.py @@ -0,0 +1,84 @@ +"""ArcGIS Submarine Cables Collector + +Collects submarine cable data from ArcGIS GeoJSON API. +""" + +import json +from typing import Dict, Any, List +from datetime import datetime +import httpx + +from app.services.collectors.base import BaseCollector + + +class ArcGISCableCollector(BaseCollector): + name = "arcgis_cables" + priority = "P1" + module = "L2" + frequency_hours = 168 + data_type = "submarine_cable" + + base_url = "https://services.arcgis.com/6DIQcwlPy8knb6sg/arcgis/rest/services/SubmarineCables/FeatureServer/2/query" + + async def fetch(self) -> List[Dict[str, Any]]: + params = {"where": "1=1", "outFields": "*", "returnGeometry": "true", "f": "geojson"} + + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.get(self.base_url, params=params) + response.raise_for_status() + return self.parse_response(response.json()) + + def parse_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: + result = [] + + features = data.get("features", []) + for feature in features: + props = feature.get("properties", {}) + geometry = feature.get("geometry", {}) + + route_coordinates = [] + if geometry.get("type") == "MultiLineString": + coords = geometry.get("coordinates", []) + for line in coords: + line_coords = [] + for point in line: + if len(point) >= 2: + line_coords.append(point) + if line_coords: + route_coordinates.append(line_coords) + elif geometry.get("type") == "LineString": + coords = geometry.get("coordinates", []) + line_coords = [] + for point in coords: + if len(point) >= 2: + line_coords.append(point) + if line_coords: + route_coordinates.append(line_coords) + + try: + entry = { + "source_id": f"arcgis_cable_{props.get('cable_id', props.get('OBJECTID', ''))}", + "name": props.get("Name", "Unknown"), + "country": "", + "city": "", + "latitude": "", + "longitude": "", + "value": str(props.get("length", "")).replace(",", ""), + "unit": "km", + "metadata": { + "cable_id": props.get("cable_id"), + "owners": props.get("owners"), + "rfs": props.get("rfs"), + "status": "active", + "year": props.get("year"), + "url": props.get("url"), + "color": props.get("color"), + "route_coordinates": route_coordinates, + }, + "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + } + result.append(entry) + except (ValueError, TypeError, KeyError): + continue + + return result diff --git a/backend/app/services/collectors/base.py b/backend/app/services/collectors/base.py index b95c5dd9..c8348439 100644 --- a/backend/app/services/collectors/base.py +++ b/backend/app/services/collectors/base.py @@ -17,7 +17,20 @@ class BaseCollector(ABC): priority: str = "P1" module: str = "L1" frequency_hours: int = 4 - data_type: str = "generic" # Override in subclass: "supercomputer", "model", "dataset", etc. + data_type: str = "generic" + + def __init__(self): + self._current_task = None + self._db_session = None + self._datasource_id = 1 + + def update_progress(self, records_processed: int): + """Update task progress - call this during data processing""" + if self._current_task and self._db_session and self._current_task.total_records > 0: + self._current_task.records_processed = records_processed + self._current_task.progress = ( + records_processed / self._current_task.total_records + ) * 100 @abstractmethod async def fetch(self) -> List[Dict[str, Any]]: @@ -35,13 +48,11 @@ class BaseCollector(ABC): from app.models.collected_data import CollectedData start_time = datetime.utcnow() - datasource_id = getattr(self, "_datasource_id", 1) # Default to 1 for built-in collectors + datasource_id = getattr(self, "_datasource_id", 1) - # Check if collector is active if not collector_registry.is_active(self.name): return {"status": "skipped", "reason": "Collector is disabled"} - # Log task start task = CollectionTask( datasource_id=datasource_id, status="running", @@ -51,16 +62,21 @@ class BaseCollector(ABC): await db.commit() task_id = task.id + self._current_task = task + self._db_session = db + try: raw_data = await self.fetch() + task.total_records = len(raw_data) + await db.commit() + data = self.transform(raw_data) - # Save data to database records_count = await self._save_data(db, data) - # Log task success task.status = "success" task.records_processed = records_count + task.progress = 100.0 task.completed_at = datetime.utcnow() await db.commit() @@ -94,8 +110,7 @@ class BaseCollector(ABC): collected_at = datetime.utcnow() records_added = 0 - for item in data: - # Create CollectedData entry + for i, item in enumerate(data): record = CollectedData( source=self.name, source_id=item.get("source_id") or item.get("id"), @@ -125,7 +140,12 @@ class BaseCollector(ABC): db.add(record) records_added += 1 + if i % 100 == 0: + self.update_progress(i + 1) + await db.commit() + await db.commit() + self.update_progress(len(data)) return records_added async def save(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int: diff --git a/backend/app/services/collectors/fao_landing.py b/backend/app/services/collectors/fao_landing.py new file mode 100644 index 00000000..1c760fd7 --- /dev/null +++ b/backend/app/services/collectors/fao_landing.py @@ -0,0 +1,66 @@ +"""FAO Landing Points Collector + +Collects landing point data from FAO CSV API. +""" + +from typing import Dict, Any, List +from datetime import datetime +import httpx + +from app.services.collectors.base import BaseCollector + + +class FAOLandingPointCollector(BaseCollector): + name = "fao_landing_points" + priority = "P1" + module = "L2" + frequency_hours = 168 + data_type = "landing_point" + + csv_url = "https://data.apps.fao.org/catalog/dataset/1b75ff21-92f2-4b96-9b7b-98e8aa65ad5d/resource/b6071077-d1d4-4e97-aa00-42e902847c87/download/landing-point-geo.csv" + + async def fetch(self) -> List[Dict[str, Any]]: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.get(self.csv_url) + response.raise_for_status() + return self.parse_csv(response.text) + + def parse_csv(self, csv_text: str) -> List[Dict[str, Any]]: + result = [] + + lines = csv_text.strip().split("\n") + if not lines: + return result + + for line in lines[1:]: + if not line.strip(): + continue + parts = line.split(",") + if len(parts) >= 4: + try: + lon = float(parts[0]) + lat = float(parts[1]) + feature_id = parts[2] + name = parts[3].strip('"') + is_tbd = parts[4].strip() == "true" if len(parts) > 4 else False + + entry = { + "source_id": f"fao_lp_{feature_id}", + "name": name, + "country": "", + "city": "", + "latitude": str(lat), + "longitude": str(lon), + "value": "", + "unit": "", + "metadata": { + "is_tbd": is_tbd, + "original_id": feature_id, + }, + "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), + } + result.append(entry) + except (ValueError, IndexError): + continue + + return result diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index db09593c..7fb886da 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -29,6 +29,8 @@ COLLECTOR_TO_ID = { "telegeography_cables": 9, "telegeography_landing": 10, "telegeography_systems": 11, + "arcgis_cables": 15, + "fao_landing_points": 16, } diff --git a/backend/app/tasks/scheduler.py b/backend/app/tasks/scheduler.py index a860054d..0225018e 100644 --- a/backend/app/tasks/scheduler.py +++ b/backend/app/tasks/scheduler.py @@ -10,6 +10,9 @@ from app.services.collectors.registry import collector_registry async def run_collector_task(collector_name: str) -> Dict[str, Any]: """Run a single collector task""" + from sqlalchemy import select + from app.models.datasource import DataSource + collector = collector_registry.get(collector_name) if not collector: return {"status": "failed", "error": f"Collector {collector_name} not found"} @@ -18,32 +21,15 @@ async def run_collector_task(collector_name: str) -> Dict[str, Any]: return {"status": "skipped", "reason": "Collector is disabled"} async with async_session_factory() as db: - from app.models.task import CollectionTask - from app.models.datasource import DataSource - - # Find datasource result = await db.execute( - "SELECT id FROM data_sources WHERE collector_class = :class_name", - {"class_name": f"{collector.__class__.__name__}"}, + select(DataSource.id).where(DataSource.collector_class == collector_name) ) - datasource = result.fetchone() + datasource = result.scalar_one_or_none() - task = CollectionTask( - datasource_id=datasource[0] if datasource else 0, - status="running", - started_at=datetime.utcnow(), - ) - db.add(task) - await db.commit() + if datasource: + collector._datasource_id = datasource result = await collector.run(db) - - task.status = result["status"] - task.completed_at = datetime.utcnow() - task.records_processed = result.get("records_processed", 0) - task.error_message = result.get("error") - await db.commit() - return result diff --git a/frontend/src/components/AppLayout/AppLayout.tsx b/frontend/src/components/AppLayout/AppLayout.tsx index a1abb13d..5ee28271 100644 --- a/frontend/src/components/AppLayout/AppLayout.tsx +++ b/frontend/src/components/AppLayout/AppLayout.tsx @@ -1,4 +1,4 @@ -import { ReactNode } from 'react' +import { ReactNode, useState } from 'react' import { Layout, Menu, Typography, Button } from 'antd' import { DashboardOutlined, @@ -7,6 +7,7 @@ import { SettingOutlined, BarChartOutlined, MenuUnfoldOutlined, + MenuFoldOutlined, } from '@ant-design/icons' import { Link, useLocation } from 'react-router-dom' import { useAuthStore } from '../../stores/auth' @@ -21,6 +22,7 @@ interface AppLayoutProps { function AppLayout({ children }: AppLayoutProps) { const location = useLocation() const { user, logout } = useAuthStore() + const [collapsed, setCollapsed] = useState(false) const menuItems = [ { key: '/', icon: , label: 仪表盘 }, @@ -34,30 +36,18 @@ function AppLayout({ children }: AppLayoutProps) { { - const sider = document.querySelector('.dashboard-sider') as HTMLElement - if (sider) { - sider.style.width = collapsed ? '80px' : '240px' - sider.style.minWidth = collapsed ? '80px' : '240px' - sider.style.maxWidth = collapsed ? '80px' : '240px' - } - }} + collapsed={collapsed} + onCollapse={setCollapsed} className="dashboard-sider" - trigger={null} - breakpoint="lg" - onBreakpoint={(broken) => { - const sider = document.querySelector('.dashboard-sider') as HTMLElement - if (sider) { - sider.style.width = broken ? '80px' : '240px' - sider.style.minWidth = broken ? '80px' : '240px' - sider.style.maxWidth = broken ? '80px' : '240px' - } - }} >
- 智能星球 + {collapsed ? ( + 🌏 + ) : ( + 智能星球 + )}
), }, - { title: '模块', dataIndex: 'module', key: 'module' }, + { title: '模块', dataIndex: 'module', key: 'module', width: 80 }, { title: '优先级', dataIndex: 'priority', key: 'priority', + width: 80, render: (p: string) => {p}, }, - { title: '频率', dataIndex: 'frequency', key: 'frequency' }, + { title: '频率', dataIndex: 'frequency', key: 'frequency', width: 80 }, + { + title: '最近采集', + dataIndex: 'last_run', + key: 'last_run', + width: 140, + render: (lastRun: string | null) => lastRun || '-', + }, { title: '状态', dataIndex: 'is_active', key: 'is_active', - render: (active: boolean) => ( - {active ? '运行中' : '已暂停'} - ), + width: 100, + render: (_: unknown, record: BuiltInDataSource) => { + const progress = taskProgress[record.id] + if (progress?.is_running || record.is_running) { + const pct = progress?.progress ?? record.progress ?? 0 + return ( + + 采集中 {Math.round(pct)}% + + ) + } + return {record.is_active ? '运行中' : '已暂停'} + }, }, { title: '操作', key: 'action', + width: 200, + fixed: 'right' as const, render: (_: unknown, record: BuiltInDataSource) => ( - + + + +