Files
planet/backend/app/api/v1/tasks.py
rayd1o de32552159 feat: add data sources config system and Earth API integration
- Add data_sources.yaml for configurable data source URLs
- Add data_sources.py to load config with database override support
- Add arcgis_landing_points and arcgis_cable_landing_relation collectors
- Change visualization API to query arcgis_landing_points
- Add /api/v1/datasources/configs/all endpoint
- Update Earth to fetch from API instead of static files
- Fix scheduler collector ID mappings
2026-03-13 10:54:02 +08:00

159 lines
4.8 KiB
Python

from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from app.db.session import get_db
from app.models.user import User
from app.core.security import get_current_user
from app.services.collectors.registry import collector_registry
router = APIRouter()
@router.get("")
async def list_tasks(
datasource_id: int = None,
status: str = None,
page: int = 1,
page_size: int = 20,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
offset = (page - 1) * page_size
query = """
SELECT ct.id, ct.datasource_id, ds.name as datasource_name, ct.status,
ct.started_at, ct.completed_at, ct.records_processed, ct.error_message
FROM collection_tasks ct
JOIN data_sources ds ON ct.datasource_id = ds.id
WHERE 1=1
"""
count_query = "SELECT COUNT(*) FROM collection_tasks ct WHERE 1=1"
params = {}
if datasource_id:
query += " AND ct.datasource_id = :datasource_id"
count_query += " WHERE ct.datasource_id = :datasource_id"
params["datasource_id"] = datasource_id
if status:
query += " AND ct.status = :status"
count_query += " AND ct.status = :status"
params["status"] = status
query += f" ORDER BY ct.created_at DESC LIMIT {page_size} OFFSET {offset}"
result = await db.execute(text(query), params)
tasks = result.fetchall()
count_result = await db.execute(text(count_query), params)
total = count_result.scalar()
return {
"total": total or 0,
"page": page,
"page_size": page_size,
"data": [
{
"id": t[0],
"datasource_id": t[1],
"datasource_name": t[2],
"status": t[3],
"started_at": t[4].isoformat() if t[4] else None,
"completed_at": t[5].isoformat() if t[5] else None,
"records_processed": t[6],
"error_message": t[7],
}
for t in tasks
],
}
@router.get("/{task_id}")
async def get_task(
task_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
text("""
SELECT ct.id, ct.datasource_id, ds.name as datasource_name, ct.status,
ct.started_at, ct.completed_at, ct.records_processed, ct.error_message
FROM collection_tasks ct
JOIN data_sources ds ON ct.datasource_id = ds.id
WHERE ct.id = :id
"""),
{"id": task_id},
)
task = result.fetchone()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found",
)
return {
"id": task[0],
"datasource_id": task[1],
"datasource_name": task[2],
"status": task[3],
"started_at": task[4].isoformat() if task[4] else None,
"completed_at": task[5].isoformat() if task[5] else None,
"records_processed": task[6],
"error_message": task[7],
}
@router.post("/datasources/{source_id}/trigger")
async def trigger_collection(
source_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
result = await db.execute(
text("SELECT id, name, collector_class FROM data_sources WHERE id = :id"),
{"id": source_id},
)
datasource = result.fetchone()
if not datasource:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Data source not found",
)
collector_class_name = datasource[2]
collector_name = collector_class_name.lower().replace("collector", "")
collector = collector_registry.get(collector_name)
if not collector:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Collector {collector_name} not found",
)
result = await collector.run(db)
await db.execute(
text("""
INSERT INTO collection_tasks (datasource_id, status, records_processed, error_message, started_at, completed_at, created_at)
VALUES (:datasource_id, :status, :records_processed, :error_message, :started_at, :completed_at, NOW())
"""),
{
"datasource_id": source_id,
"status": result.get("status", "unknown"),
"records_processed": result.get("records_processed", 0),
"error_message": result.get("error"),
"started_at": datetime.utcnow(),
"completed_at": datetime.utcnow(),
},
)
return {
"message": "Collection task executed",
"result": result,
}