from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from app.db.session import get_db from app.models.user import User from app.core.security import get_current_user from app.services.collectors.registry import collector_registry router = APIRouter() @router.get("") async def list_tasks( datasource_id: int = None, status: str = None, page: int = 1, page_size: int = 20, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): offset = (page - 1) * page_size query = """ SELECT ct.id, ct.datasource_id, ds.name as datasource_name, ct.status, ct.started_at, ct.completed_at, ct.records_processed, ct.error_message FROM collection_tasks ct JOIN data_sources ds ON ct.datasource_id = ds.id WHERE 1=1 """ count_query = "SELECT COUNT(*) FROM collection_tasks ct WHERE 1=1" params = {} if datasource_id: query += " AND ct.datasource_id = :datasource_id" count_query += " WHERE ct.datasource_id = :datasource_id" params["datasource_id"] = datasource_id if status: query += " AND ct.status = :status" count_query += " AND ct.status = :status" params["status"] = status query += f" ORDER BY ct.created_at DESC LIMIT {page_size} OFFSET {offset}" result = await db.execute(text(query), params) tasks = result.fetchall() count_result = await db.execute(text(count_query), params) total = count_result.scalar() return { "total": total or 0, "page": page, "page_size": page_size, "data": [ { "id": t[0], "datasource_id": t[1], "datasource_name": t[2], "status": t[3], "started_at": t[4].isoformat() if t[4] else None, "completed_at": t[5].isoformat() if t[5] else None, "records_processed": t[6], "error_message": t[7], } for t in tasks ], } @router.get("/{task_id}") async def get_task( task_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( text(""" SELECT ct.id, ct.datasource_id, ds.name as datasource_name, ct.status, ct.started_at, ct.completed_at, ct.records_processed, ct.error_message FROM collection_tasks ct JOIN data_sources ds ON ct.datasource_id = ds.id WHERE ct.id = :id """), {"id": task_id}, ) task = result.fetchone() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Task not found", ) return { "id": task[0], "datasource_id": task[1], "datasource_name": task[2], "status": task[3], "started_at": task[4].isoformat() if task[4] else None, "completed_at": task[5].isoformat() if task[5] else None, "records_processed": task[6], "error_message": task[7], } @router.post("/datasources/{source_id}/trigger") async def trigger_collection( source_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( text("SELECT id, name, collector_class FROM data_sources WHERE id = :id"), {"id": source_id}, ) datasource = result.fetchone() if not datasource: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Data source not found", ) collector_class_name = datasource[2] collector_name = collector_class_name.lower().replace("collector", "") collector = collector_registry.get(collector_name) if not collector: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Collector {collector_name} not found", ) result = await collector.run(db) await db.execute( text(""" INSERT INTO collection_tasks (datasource_id, status, records_processed, error_message, started_at, completed_at, created_at) VALUES (:datasource_id, :status, :records_processed, :error_message, :started_at, :completed_at, NOW()) """), { "datasource_id": source_id, "status": result.get("status", "unknown"), "records_processed": result.get("records_processed", 0), "error_message": result.get("error"), "started_at": datetime.utcnow(), "completed_at": datetime.utcnow(), }, ) return { "message": "Collection task executed", "result": result, }