- Add data_sources.yaml for configurable data source URLs - Add data_sources.py to load config with database override support - Add arcgis_landing_points and arcgis_cable_landing_relation collectors - Change visualization API to query arcgis_landing_points - Add /api/v1/datasources/configs/all endpoint - Update Earth to fetch from API instead of static files - Fix scheduler collector ID mappings
407 lines
12 KiB
Python
407 lines
12 KiB
Python
from typing import List, Optional
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.db.session import get_db
|
|
from app.models.user import User
|
|
from app.models.datasource import DataSource
|
|
from app.models.task import CollectionTask
|
|
from app.models.collected_data import CollectedData
|
|
from app.core.security import get_current_user
|
|
from app.services.collectors.registry import collector_registry
|
|
|
|
router = APIRouter()
|
|
|
|
COLLECTOR_INFO = {
|
|
"top500": {
|
|
"id": 1,
|
|
"name": "TOP500 Supercomputers",
|
|
"module": "L1",
|
|
"priority": "P0",
|
|
"frequency_hours": 4,
|
|
},
|
|
"epoch_ai_gpu": {
|
|
"id": 2,
|
|
"name": "Epoch AI GPU Clusters",
|
|
"module": "L1",
|
|
"priority": "P0",
|
|
"frequency_hours": 6,
|
|
},
|
|
"huggingface_models": {
|
|
"id": 3,
|
|
"name": "HuggingFace Models",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 12,
|
|
},
|
|
"huggingface_datasets": {
|
|
"id": 4,
|
|
"name": "HuggingFace Datasets",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 12,
|
|
},
|
|
"huggingface_spaces": {
|
|
"id": 5,
|
|
"name": "HuggingFace Spaces",
|
|
"module": "L2",
|
|
"priority": "P2",
|
|
"frequency_hours": 24,
|
|
},
|
|
"peeringdb_ixp": {
|
|
"id": 6,
|
|
"name": "PeeringDB IXP",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 24,
|
|
},
|
|
"peeringdb_network": {
|
|
"id": 7,
|
|
"name": "PeeringDB Networks",
|
|
"module": "L2",
|
|
"priority": "P2",
|
|
"frequency_hours": 48,
|
|
},
|
|
"peeringdb_facility": {
|
|
"id": 8,
|
|
"name": "PeeringDB Facilities",
|
|
"module": "L2",
|
|
"priority": "P2",
|
|
"frequency_hours": 48,
|
|
},
|
|
"telegeography_cables": {
|
|
"id": 9,
|
|
"name": "Submarine Cables",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 168,
|
|
},
|
|
"telegeography_landing": {
|
|
"id": 10,
|
|
"name": "Cable Landing Points",
|
|
"module": "L2",
|
|
"priority": "P2",
|
|
"frequency_hours": 168,
|
|
},
|
|
"telegeography_systems": {
|
|
"id": 11,
|
|
"name": "Cable Systems",
|
|
"module": "L2",
|
|
"priority": "P2",
|
|
"frequency_hours": 168,
|
|
},
|
|
"arcgis_cables": {
|
|
"id": 15,
|
|
"name": "ArcGIS Submarine Cables",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 168,
|
|
},
|
|
"arcgis_landing_points": {
|
|
"id": 16,
|
|
"name": "ArcGIS Landing Points",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 168,
|
|
},
|
|
"arcgis_cable_landing_relation": {
|
|
"id": 17,
|
|
"name": "ArcGIS Cable-Landing Relations",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 168,
|
|
},
|
|
"fao_landing_points": {
|
|
"id": 18,
|
|
"name": "FAO Landing Points",
|
|
"module": "L2",
|
|
"priority": "P1",
|
|
"frequency_hours": 168,
|
|
},
|
|
}
|
|
|
|
ID_TO_COLLECTOR = {info["id"]: name for name, info in COLLECTOR_INFO.items()}
|
|
COLLECTOR_TO_ID = {name: info["id"] for name, info in COLLECTOR_INFO.items()}
|
|
|
|
|
|
def get_collector_name(source_id: str) -> Optional[str]:
|
|
try:
|
|
numeric_id = int(source_id)
|
|
if numeric_id in ID_TO_COLLECTOR:
|
|
return ID_TO_COLLECTOR[numeric_id]
|
|
except ValueError:
|
|
pass
|
|
if source_id in COLLECTOR_INFO:
|
|
return source_id
|
|
return None
|
|
|
|
|
|
@router.get("")
|
|
async def list_datasources(
|
|
module: Optional[str] = None,
|
|
is_active: Optional[bool] = None,
|
|
priority: Optional[str] = None,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
query = select(DataSource)
|
|
|
|
filters = []
|
|
if module:
|
|
filters.append(DataSource.module == module)
|
|
if is_active is not None:
|
|
filters.append(DataSource.is_active == is_active)
|
|
if priority:
|
|
filters.append(DataSource.priority == priority)
|
|
|
|
if filters:
|
|
query = query.where(*filters)
|
|
|
|
result = await db.execute(query)
|
|
datasources = result.scalars().all()
|
|
|
|
collector_list = []
|
|
for name, info in COLLECTOR_INFO.items():
|
|
is_active_status = collector_registry.is_active(name)
|
|
|
|
running_task_query = (
|
|
select(CollectionTask)
|
|
.where(CollectionTask.datasource_id == info["id"])
|
|
.where(CollectionTask.status == "running")
|
|
.order_by(CollectionTask.started_at.desc())
|
|
.limit(1)
|
|
)
|
|
running_result = await db.execute(running_task_query)
|
|
running_task = running_result.scalar_one_or_none()
|
|
|
|
last_run_query = (
|
|
select(CollectionTask)
|
|
.where(CollectionTask.datasource_id == info["id"])
|
|
.where(CollectionTask.completed_at.isnot(None))
|
|
.order_by(CollectionTask.completed_at.desc())
|
|
.limit(1)
|
|
)
|
|
last_run_result = await db.execute(last_run_query)
|
|
last_task = last_run_result.scalar_one_or_none()
|
|
|
|
data_count_query = select(func.count(CollectedData.id)).where(CollectedData.source == name)
|
|
data_count_result = await db.execute(data_count_query)
|
|
data_count = data_count_result.scalar() or 0
|
|
|
|
last_run = None
|
|
if last_task and last_task.completed_at and data_count > 0:
|
|
last_run = last_task.completed_at.strftime("%Y-%m-%d %H:%M")
|
|
|
|
collector_list.append(
|
|
{
|
|
"id": info["id"],
|
|
"name": info["name"],
|
|
"module": info["module"],
|
|
"priority": info["priority"],
|
|
"frequency": f"{info['frequency_hours']}h",
|
|
"is_active": is_active_status,
|
|
"collector_class": name,
|
|
"last_run": last_run,
|
|
"is_running": running_task is not None,
|
|
"task_id": running_task.id if running_task else None,
|
|
"progress": running_task.progress if running_task else None,
|
|
"records_processed": running_task.records_processed if running_task else None,
|
|
"total_records": running_task.total_records if running_task else None,
|
|
}
|
|
)
|
|
|
|
if module:
|
|
collector_list = [c for c in collector_list if c["module"] == module]
|
|
if priority:
|
|
collector_list = [c for c in collector_list if c["priority"] == priority]
|
|
|
|
return {
|
|
"total": len(collector_list),
|
|
"data": collector_list,
|
|
}
|
|
|
|
|
|
@router.get("/{source_id}")
|
|
async def get_datasource(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
info = COLLECTOR_INFO[collector_name]
|
|
return {
|
|
"id": info["id"],
|
|
"name": info["name"],
|
|
"module": info["module"],
|
|
"priority": info["priority"],
|
|
"frequency": f"{info['frequency_hours']}h",
|
|
"collector_class": collector_name,
|
|
"is_active": collector_registry.is_active(collector_name),
|
|
}
|
|
|
|
|
|
@router.post("/{source_id}/enable")
|
|
async def enable_datasource(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
collector_registry.set_active(collector_name, True)
|
|
return {"status": "enabled", "source_id": source_id}
|
|
|
|
|
|
@router.post("/{source_id}/disable")
|
|
async def disable_datasource(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
collector_registry.set_active(collector_name, False)
|
|
return {"status": "disabled", "source_id": source_id}
|
|
|
|
|
|
@router.get("/{source_id}/stats")
|
|
async def get_datasource_stats(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
info = COLLECTOR_INFO[collector_name]
|
|
source_name = info["name"]
|
|
|
|
query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name)
|
|
result = await db.execute(query)
|
|
total = result.scalar() or 0
|
|
|
|
if total == 0:
|
|
query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name)
|
|
result = await db.execute(query)
|
|
total = result.scalar() or 0
|
|
|
|
return {
|
|
"source_id": source_id,
|
|
"collector_name": collector_name,
|
|
"name": info["name"],
|
|
"total_records": total,
|
|
}
|
|
|
|
|
|
@router.post("/{source_id}/trigger")
|
|
async def trigger_datasource(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
from app.services.scheduler import run_collector_now
|
|
|
|
if not collector_registry.is_active(collector_name):
|
|
raise HTTPException(status_code=400, detail="Data source is disabled")
|
|
|
|
success = run_collector_now(collector_name)
|
|
|
|
if success:
|
|
return {
|
|
"status": "triggered",
|
|
"source_id": source_id,
|
|
"collector_name": collector_name,
|
|
"message": f"Collector '{collector_name}' has been triggered",
|
|
}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to trigger collector '{collector_name}'",
|
|
)
|
|
|
|
|
|
@router.delete("/{source_id}/data")
|
|
async def clear_datasource_data(
|
|
source_id: str,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
info = COLLECTOR_INFO[collector_name]
|
|
source_name = info["name"]
|
|
|
|
query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name)
|
|
result = await db.execute(query)
|
|
count = result.scalar() or 0
|
|
|
|
if count == 0:
|
|
query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name)
|
|
result = await db.execute(query)
|
|
count = result.scalar() or 0
|
|
delete_source = source_name
|
|
else:
|
|
delete_source = collector_name
|
|
|
|
if count == 0:
|
|
return {
|
|
"status": "success",
|
|
"message": "No data to clear",
|
|
"deleted_count": 0,
|
|
}
|
|
|
|
delete_query = CollectedData.__table__.delete().where(CollectedData.source == delete_source)
|
|
await db.execute(delete_query)
|
|
await db.commit()
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": f"Cleared {count} records for data source '{info['name']}'",
|
|
"deleted_count": count,
|
|
}
|
|
|
|
|
|
@router.get("/{source_id}/task-status")
|
|
async def get_task_status(
|
|
source_id: str,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
collector_name = get_collector_name(source_id)
|
|
if not collector_name:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
info = COLLECTOR_INFO[collector_name]
|
|
|
|
running_task_query = (
|
|
select(CollectionTask)
|
|
.where(CollectionTask.datasource_id == info["id"])
|
|
.where(CollectionTask.status == "running")
|
|
.order_by(CollectionTask.started_at.desc())
|
|
.limit(1)
|
|
)
|
|
running_result = await db.execute(running_task_query)
|
|
running_task = running_result.scalar_one_or_none()
|
|
|
|
if not running_task:
|
|
return {"is_running": False, "task_id": None, "progress": None}
|
|
|
|
return {
|
|
"is_running": True,
|
|
"task_id": running_task.id,
|
|
"progress": running_task.progress,
|
|
"records_processed": running_task.records_processed,
|
|
"total_records": running_task.total_records,
|
|
"status": running_task.status,
|
|
}
|