from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.db.session import get_db from app.models.user import User from app.models.datasource import DataSource from app.models.task import CollectionTask from app.models.collected_data import CollectedData from app.core.security import get_current_user from app.services.collectors.registry import collector_registry router = APIRouter() COLLECTOR_INFO = { "top500": { "id": 1, "name": "TOP500 Supercomputers", "module": "L1", "priority": "P0", "frequency_hours": 4, }, "epoch_ai_gpu": { "id": 2, "name": "Epoch AI GPU Clusters", "module": "L1", "priority": "P0", "frequency_hours": 6, }, "huggingface_models": { "id": 3, "name": "HuggingFace Models", "module": "L2", "priority": "P1", "frequency_hours": 12, }, "huggingface_datasets": { "id": 4, "name": "HuggingFace Datasets", "module": "L2", "priority": "P1", "frequency_hours": 12, }, "huggingface_spaces": { "id": 5, "name": "HuggingFace Spaces", "module": "L2", "priority": "P2", "frequency_hours": 24, }, "peeringdb_ixp": { "id": 6, "name": "PeeringDB IXP", "module": "L2", "priority": "P1", "frequency_hours": 24, }, "peeringdb_network": { "id": 7, "name": "PeeringDB Networks", "module": "L2", "priority": "P2", "frequency_hours": 48, }, "peeringdb_facility": { "id": 8, "name": "PeeringDB Facilities", "module": "L2", "priority": "P2", "frequency_hours": 48, }, "telegeography_cables": { "id": 9, "name": "Submarine Cables", "module": "L2", "priority": "P1", "frequency_hours": 168, }, "telegeography_landing": { "id": 10, "name": "Cable Landing Points", "module": "L2", "priority": "P2", "frequency_hours": 168, }, "telegeography_systems": { "id": 11, "name": "Cable Systems", "module": "L2", "priority": "P2", "frequency_hours": 168, }, "arcgis_cables": { "id": 15, "name": "ArcGIS Submarine Cables", "module": "L2", "priority": "P1", "frequency_hours": 168, }, "fao_landing_points": { "id": 16, "name": "FAO Landing Points", "module": "L2", "priority": "P1", "frequency_hours": 168, }, } ID_TO_COLLECTOR = {info["id"]: name for name, info in COLLECTOR_INFO.items()} COLLECTOR_TO_ID = {name: info["id"] for name, info in COLLECTOR_INFO.items()} def get_collector_name(source_id: str) -> Optional[str]: try: numeric_id = int(source_id) if numeric_id in ID_TO_COLLECTOR: return ID_TO_COLLECTOR[numeric_id] except ValueError: pass if source_id in COLLECTOR_INFO: return source_id return None @router.get("") async def list_datasources( module: Optional[str] = None, is_active: Optional[bool] = None, priority: Optional[str] = None, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): query = select(DataSource) filters = [] if module: filters.append(DataSource.module == module) if is_active is not None: filters.append(DataSource.is_active == is_active) if priority: filters.append(DataSource.priority == priority) if filters: query = query.where(*filters) result = await db.execute(query) datasources = result.scalars().all() collector_list = [] for name, info in COLLECTOR_INFO.items(): is_active_status = collector_registry.is_active(name) running_task_query = ( select(CollectionTask) .where(CollectionTask.datasource_id == info["id"]) .where(CollectionTask.status == "running") .order_by(CollectionTask.started_at.desc()) .limit(1) ) running_result = await db.execute(running_task_query) running_task = running_result.scalar_one_or_none() last_run_query = ( select(CollectionTask) .where(CollectionTask.datasource_id == info["id"]) .where(CollectionTask.completed_at.isnot(None)) .order_by(CollectionTask.completed_at.desc()) .limit(1) ) last_run_result = await db.execute(last_run_query) last_task = last_run_result.scalar_one_or_none() data_count_query = select(func.count(CollectedData.id)).where(CollectedData.source == name) data_count_result = await db.execute(data_count_query) data_count = data_count_result.scalar() or 0 last_run = None if last_task and last_task.completed_at and data_count > 0: last_run = last_task.completed_at.strftime("%Y-%m-%d %H:%M") collector_list.append( { "id": info["id"], "name": info["name"], "module": info["module"], "priority": info["priority"], "frequency": f"{info['frequency_hours']}h", "is_active": is_active_status, "collector_class": name, "last_run": last_run, "is_running": running_task is not None, "task_id": running_task.id if running_task else None, "progress": running_task.progress if running_task else None, "records_processed": running_task.records_processed if running_task else None, "total_records": running_task.total_records if running_task else None, } ) if module: collector_list = [c for c in collector_list if c["module"] == module] if priority: collector_list = [c for c in collector_list if c["priority"] == priority] return { "total": len(collector_list), "data": collector_list, } @router.get("/{source_id}") async def get_datasource( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") info = COLLECTOR_INFO[collector_name] return { "id": info["id"], "name": info["name"], "module": info["module"], "priority": info["priority"], "frequency": f"{info['frequency_hours']}h", "collector_class": collector_name, "is_active": collector_registry.is_active(collector_name), } @router.post("/{source_id}/enable") async def enable_datasource( source_id: str, current_user: User = Depends(get_current_user), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") collector_registry.set_active(collector_name, True) return {"status": "enabled", "source_id": source_id} @router.post("/{source_id}/disable") async def disable_datasource( source_id: str, current_user: User = Depends(get_current_user), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") collector_registry.set_active(collector_name, False) return {"status": "disabled", "source_id": source_id} @router.get("/{source_id}/stats") async def get_datasource_stats( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") info = COLLECTOR_INFO[collector_name] source_name = info["name"] query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name) result = await db.execute(query) total = result.scalar() or 0 if total == 0: query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name) result = await db.execute(query) total = result.scalar() or 0 return { "source_id": source_id, "collector_name": collector_name, "name": info["name"], "total_records": total, } @router.post("/{source_id}/trigger") async def trigger_datasource( source_id: str, current_user: User = Depends(get_current_user), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") from app.services.scheduler import run_collector_now if not collector_registry.is_active(collector_name): raise HTTPException(status_code=400, detail="Data source is disabled") success = run_collector_now(collector_name) if success: return { "status": "triggered", "source_id": source_id, "collector_name": collector_name, "message": f"Collector '{collector_name}' has been triggered", } else: raise HTTPException( status_code=500, detail=f"Failed to trigger collector '{collector_name}'", ) @router.delete("/{source_id}/data") async def clear_datasource_data( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") info = COLLECTOR_INFO[collector_name] source_name = info["name"] query = select(func.count(CollectedData.id)).where(CollectedData.source == collector_name) result = await db.execute(query) count = result.scalar() or 0 if count == 0: query = select(func.count(CollectedData.id)).where(CollectedData.source == source_name) result = await db.execute(query) count = result.scalar() or 0 delete_source = source_name else: delete_source = collector_name if count == 0: return { "status": "success", "message": "No data to clear", "deleted_count": 0, } delete_query = CollectedData.__table__.delete().where(CollectedData.source == delete_source) await db.execute(delete_query) await db.commit() return { "status": "success", "message": f"Cleared {count} records for data source '{info['name']}'", "deleted_count": count, } @router.get("/{source_id}/task-status") async def get_task_status( source_id: str, db: AsyncSession = Depends(get_db), ): collector_name = get_collector_name(source_id) if not collector_name: raise HTTPException(status_code=404, detail="Data source not found") info = COLLECTOR_INFO[collector_name] running_task_query = ( select(CollectionTask) .where(CollectionTask.datasource_id == info["id"]) .where(CollectionTask.status == "running") .order_by(CollectionTask.started_at.desc()) .limit(1) ) running_result = await db.execute(running_task_query) running_task = running_result.scalar_one_or_none() if not running_task: return {"is_running": False, "task_id": None, "progress": None} return { "is_running": True, "task_id": running_task.id, "progress": running_task.progress, "records_processed": running_task.records_processed, "total_records": running_task.total_records, "status": running_task.status, }