import asyncio from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.time import to_iso8601_utc from app.core.security import get_current_user from app.core.data_sources import get_data_sources_config from app.db.session import get_db from app.models.collected_data import CollectedData from app.models.datasource import DataSource from app.models.task import CollectionTask from app.models.user import User from app.services.scheduler import get_latest_task_id_for_datasource, run_collector_now, sync_datasource_job router = APIRouter() def format_frequency_label(minutes: int) -> str: if minutes % 1440 == 0: return f"{minutes // 1440}d" if minutes % 60 == 0: return f"{minutes // 60}h" return f"{minutes}m" def is_due_for_collection(datasource: DataSource, now: datetime) -> bool: if datasource.last_run_at is None: return True return datasource.last_run_at + timedelta(minutes=datasource.frequency_minutes) <= now async def get_datasource_record(db: AsyncSession, source_id: str) -> Optional[DataSource]: datasource = None try: datasource = await db.get(DataSource, int(source_id)) except ValueError: pass if datasource is not None: return datasource result = await db.execute( select(DataSource).where( (DataSource.source == source_id) | (DataSource.collector_class == source_id) ) ) return result.scalar_one_or_none() async def get_last_completed_task(db: AsyncSession, datasource_id: int) -> Optional[CollectionTask]: result = await db.execute( select(CollectionTask) .where(CollectionTask.datasource_id == datasource_id) .where(CollectionTask.completed_at.isnot(None)) .where(CollectionTask.status.in_(("success", "failed", "cancelled"))) .order_by(CollectionTask.completed_at.desc()) .limit(1) ) return result.scalar_one_or_none() async def get_running_task(db: AsyncSession, datasource_id: int) -> Optional[CollectionTask]: result = await db.execute( select(CollectionTask) .where(CollectionTask.datasource_id == datasource_id) .where(CollectionTask.status == "running") .order_by(CollectionTask.started_at.desc()) .limit(1) ) return result.scalar_one_or_none() @router.get("") async def list_datasources( module: Optional[str] = None, is_active: Optional[bool] = None, priority: Optional[str] = None, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): query = select(DataSource).order_by(DataSource.module, DataSource.id) if module: query = query.where(DataSource.module == module) if is_active is not None: query = query.where(DataSource.is_active == is_active) if priority: query = query.where(DataSource.priority == priority) result = await db.execute(query) datasources = result.scalars().all() collector_list = [] config = get_data_sources_config() for datasource in datasources: running_task = await get_running_task(db, datasource.id) last_task = await get_last_completed_task(db, datasource.id) endpoint = await config.get_url(datasource.source, db) data_count_result = await db.execute( select(func.count(CollectedData.id)).where(CollectedData.source == datasource.source) ) data_count = data_count_result.scalar() or 0 last_run_at = datasource.last_run_at or (last_task.completed_at if last_task else None) last_run = to_iso8601_utc(last_run_at) last_status = datasource.last_status or (last_task.status if last_task else None) collector_list.append( { "id": datasource.id, "name": datasource.name, "module": datasource.module, "priority": datasource.priority, "frequency": format_frequency_label(datasource.frequency_minutes), "frequency_minutes": datasource.frequency_minutes, "is_active": datasource.is_active, "collector_class": datasource.collector_class, "endpoint": endpoint, "last_run": last_run, "last_run_at": to_iso8601_utc(last_run_at), "last_status": last_status, "last_records_processed": last_task.records_processed if last_task else None, "data_count": data_count, "is_running": running_task is not None, "task_id": running_task.id if running_task else None, "progress": running_task.progress if running_task else None, "phase": running_task.phase if running_task else None, "records_processed": running_task.records_processed if running_task else None, "total_records": running_task.total_records if running_task else None, } ) return {"total": len(collector_list), "data": collector_list} @router.post("/trigger-all") async def trigger_all_datasources( force: bool = Query(False), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(DataSource) .where(DataSource.is_active == True) .order_by(DataSource.module, DataSource.id) ) datasources = result.scalars().all() if not datasources: return { "status": "noop", "message": "No active data sources to trigger", "triggered": [], "skipped": [], "failed": [], } previous_task_ids: dict[int, Optional[int]] = {} triggered_sources: list[dict] = [] skipped_sources: list[dict] = [] failed_sources: list[dict] = [] now = datetime.now(timezone.utc) for datasource in datasources: running_task = await get_running_task(db, datasource.id) if running_task is not None: skipped_sources.append( { "id": datasource.id, "source": datasource.source, "name": datasource.name, "reason": "already_running", "task_id": running_task.id, } ) continue if not force and not is_due_for_collection(datasource, now): skipped_sources.append( { "id": datasource.id, "source": datasource.source, "name": datasource.name, "reason": "within_frequency_window", "last_run_at": to_iso8601_utc(datasource.last_run_at), "next_run_at": to_iso8601_utc( datasource.last_run_at + timedelta(minutes=datasource.frequency_minutes) ), } ) continue previous_task_ids[datasource.id] = await get_latest_task_id_for_datasource(datasource.id) success = run_collector_now(datasource.source) if not success: failed_sources.append( { "id": datasource.id, "source": datasource.source, "name": datasource.name, "reason": "trigger_failed", } ) continue triggered_sources.append( { "id": datasource.id, "source": datasource.source, "name": datasource.name, "task_id": None, } ) for _ in range(20): await asyncio.sleep(0.1) pending = [item for item in triggered_sources if item["task_id"] is None] if not pending: break for item in pending: task_id = await get_latest_task_id_for_datasource(item["id"]) if task_id is not None and task_id != previous_task_ids.get(item["id"]): item["task_id"] = task_id return { "status": "triggered" if triggered_sources else "partial", "message": f"Triggered {len(triggered_sources)} data sources", "force": force, "triggered": triggered_sources, "skipped": skipped_sources, "failed": failed_sources, } @router.get("/{source_id}") async def get_datasource( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") config = get_data_sources_config() endpoint = await config.get_url(datasource.source, db) return { "id": datasource.id, "name": datasource.name, "module": datasource.module, "priority": datasource.priority, "frequency": format_frequency_label(datasource.frequency_minutes), "frequency_minutes": datasource.frequency_minutes, "collector_class": datasource.collector_class, "source": datasource.source, "endpoint": endpoint, "is_active": datasource.is_active, } @router.post("/{source_id}/enable") async def enable_datasource( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") datasource.is_active = True await db.commit() await sync_datasource_job(datasource.id) return {"status": "enabled", "source_id": datasource.id} @router.post("/{source_id}/disable") async def disable_datasource( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") datasource.is_active = False await db.commit() await sync_datasource_job(datasource.id) return {"status": "disabled", "source_id": datasource.id} @router.get("/{source_id}/stats") async def get_datasource_stats( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") result = await db.execute( select(func.count(CollectedData.id)).where(CollectedData.source == datasource.source) ) total = result.scalar() or 0 return { "source_id": datasource.id, "collector_name": datasource.collector_class, "name": datasource.name, "total_records": total, } @router.post("/{source_id}/trigger") async def trigger_datasource( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") if not datasource.is_active: raise HTTPException(status_code=400, detail="Data source is disabled") previous_task_id = await get_latest_task_id_for_datasource(datasource.id) success = run_collector_now(datasource.source) if not success: raise HTTPException(status_code=500, detail=f"Failed to trigger collector '{datasource.source}'") task_id = None for _ in range(20): await asyncio.sleep(0.1) task_id = await get_latest_task_id_for_datasource(datasource.id) if task_id is not None and task_id != previous_task_id: break if task_id == previous_task_id: task_id = None return { "status": "triggered", "source_id": datasource.id, "task_id": task_id, "collector_name": datasource.source, "message": f"Collector '{datasource.source}' has been triggered", } @router.delete("/{source_id}/data") async def clear_datasource_data( source_id: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") result = await db.execute( select(func.count(CollectedData.id)).where(CollectedData.source == datasource.source) ) count = result.scalar() or 0 if count == 0: return {"status": "success", "message": "No data to clear", "deleted_count": 0} delete_query = CollectedData.__table__.delete().where(CollectedData.source == datasource.source) await db.execute(delete_query) await db.commit() return { "status": "success", "message": f"Cleared {count} records for data source '{datasource.name}'", "deleted_count": count, } @router.get("/{source_id}/task-status") async def get_task_status( source_id: str, task_id: Optional[int] = None, db: AsyncSession = Depends(get_db), ): datasource = await get_datasource_record(db, source_id) if not datasource: raise HTTPException(status_code=404, detail="Data source not found") if task_id is not None: task = await db.get(CollectionTask, task_id) if not task or task.datasource_id != datasource.id: raise HTTPException(status_code=404, detail="Task not found") else: task = await get_running_task(db, datasource.id) if not task: return {"is_running": False, "task_id": None, "progress": None, "phase": None, "status": "idle"} return { "is_running": task.status == "running", "task_id": task.id, "progress": task.progress, "phase": task.phase, "records_processed": task.records_processed, "total_records": task.total_records, "status": task.status, }