Files
planet/backend/app/api/v1/datasource_config.py
rayd1o de32552159 feat: add data sources config system and Earth API integration
- Add data_sources.yaml for configurable data source URLs
- Add data_sources.py to load config with database override support
- Add arcgis_landing_points and arcgis_cable_landing_relation collectors
- Change visualization API to query arcgis_landing_points
- Add /api/v1/datasources/configs/all endpoint
- Update Earth to fetch from API instead of static files
- Fix scheduler collector ID mappings
2026-03-13 10:54:02 +08:00

347 lines
11 KiB
Python

"""DataSourceConfig API for user-defined data sources"""
from typing import Optional
from datetime import datetime
import base64
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel, Field
import httpx
from app.db.session import get_db
from app.models.user import User
from app.models.datasource_config import DataSourceConfig
from app.core.security import get_current_user
from app.core.cache import cache
router = APIRouter()
class DataSourceConfigCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
source_type: str = Field(..., description="http, api, database")
endpoint: str = Field(..., max_length=500)
auth_type: str = Field(default="none", description="none, bearer, api_key, basic")
auth_config: dict = Field(default={})
headers: dict = Field(default={})
config: dict = Field(default={"timeout": 30, "retry": 3})
class DataSourceConfigUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
source_type: Optional[str] = None
endpoint: Optional[str] = Field(None, max_length=500)
auth_type: Optional[str] = None
auth_config: Optional[dict] = None
headers: Optional[dict] = None
config: Optional[dict] = None
is_active: Optional[bool] = None
class DataSourceConfigResponse(BaseModel):
id: int
name: str
description: Optional[str]
source_type: str
endpoint: str
auth_type: str
headers: dict
config: dict
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
async def test_endpoint(
endpoint: str,
auth_type: str,
auth_config: dict,
headers: dict,
config: dict,
) -> dict:
"""Test an endpoint connection"""
timeout = config.get("timeout", 30)
test_headers = headers.copy()
# Add auth headers
if auth_type == "bearer" and auth_config.get("token"):
test_headers["Authorization"] = f"Bearer {auth_config['token']}"
elif auth_type == "api_key" and auth_config.get("api_key"):
key_name = auth_config.get("key_name", "X-API-Key")
test_headers[key_name] = auth_config["api_key"]
elif auth_type == "basic":
username = auth_config.get("username", "")
password = auth_config.get("password", "")
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
test_headers["Authorization"] = f"Basic {encoded}"
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(endpoint, headers=test_headers)
response.raise_for_status()
return {
"status_code": response.status_code,
"success": True,
"response_time_ms": response.elapsed.total_seconds() * 1000,
"data_preview": str(response.json()[:3])
if response.headers.get("content-type", "").startswith("application/json")
else response.text[:200],
}
@router.get("/configs")
async def list_configs(
active_only: bool = False,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List all user-defined data source configurations"""
query = select(DataSourceConfig)
if active_only:
query = query.where(DataSourceConfig.is_active == True)
query = query.order_by(DataSourceConfig.created_at.desc())
result = await db.execute(query)
configs = result.scalars().all()
return {
"total": len(configs),
"data": [
{
"id": c.id,
"name": c.name,
"description": c.description,
"source_type": c.source_type,
"endpoint": c.endpoint,
"auth_type": c.auth_type,
"headers": c.headers,
"config": c.config,
"is_active": c.is_active,
"created_at": c.created_at.isoformat() if c.created_at else None,
"updated_at": c.updated_at.isoformat() if c.updated_at else None,
}
for c in configs
],
}
@router.get("/configs/{config_id}")
async def get_config(
config_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Get a single data source configuration"""
result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
return {
"id": config.id,
"name": config.name,
"description": config.description,
"source_type": config.source_type,
"endpoint": config.endpoint,
"auth_type": config.auth_type,
"auth_config": {}, # Don't return sensitive data
"headers": config.headers,
"config": config.config,
"is_active": config.is_active,
"created_at": config.created_at.isoformat() if config.created_at else None,
"updated_at": config.updated_at.isoformat() if config.updated_at else None,
}
@router.post("/configs")
async def create_config(
config_data: DataSourceConfigCreate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Create a new data source configuration"""
config = DataSourceConfig(
name=config_data.name,
description=config_data.description,
source_type=config_data.source_type,
endpoint=config_data.endpoint,
auth_type=config_data.auth_type,
auth_config=config_data.auth_config,
headers=config_data.headers,
config=config_data.config,
)
db.add(config)
await db.commit()
await db.refresh(config)
cache.delete_pattern("datasource_configs:*")
return {
"id": config.id,
"name": config.name,
"message": "Configuration created successfully",
}
@router.put("/configs/{config_id}")
async def update_config(
config_id: int,
config_data: DataSourceConfigUpdate,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Update a data source configuration"""
result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
update_data = config_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(config, field, value)
await db.commit()
await db.refresh(config)
cache.delete_pattern("datasource_configs:*")
return {
"id": config.id,
"name": config.name,
"message": "Configuration updated successfully",
}
@router.delete("/configs/{config_id}")
async def delete_config(
config_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Delete a data source configuration"""
result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
await db.delete(config)
await db.commit()
cache.delete_pattern("datasource_configs:*")
return {"message": "Configuration deleted successfully"}
@router.post("/configs/{config_id}/test")
async def test_config(
config_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Test a data source configuration"""
result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id))
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="Configuration not found")
try:
result = await test_endpoint(
endpoint=config.endpoint,
auth_type=config.auth_type,
auth_config=config.auth_config or {},
headers=config.headers or {},
config=config.config or {},
)
return result
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP Error: {e.response.status_code}",
"message": str(e),
}
except Exception as e:
return {
"success": False,
"error": "Connection failed",
"message": str(e),
}
@router.post("/configs/test")
async def test_new_config(
config_data: DataSourceConfigCreate,
current_user: User = Depends(get_current_user),
):
"""Test a new data source configuration without saving"""
try:
result = await test_endpoint(
endpoint=config_data.endpoint,
auth_type=config_data.auth_type,
auth_config=config_data.auth_config or {},
headers=config_data.headers or {},
config=config_data.config or {},
)
return result
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP Error: {e.response.status_code}",
"message": str(e),
}
except Exception as e:
return {
"success": False,
"error": "Connection failed",
"message": str(e),
}
@router.get("/configs/all")
async def list_all_datasources(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""List all data sources: YAML defaults + DB overrides"""
from app.core.data_sources import COLLECTOR_URL_KEYS, get_data_sources_config
config = get_data_sources_config()
db_query = await db.execute(select(DataSourceConfig))
db_configs = {c.name: c for c in db_query.scalars().all()}
result = []
for name, yaml_key in COLLECTOR_URL_KEYS.items():
yaml_url = config.get_yaml_url(name)
db_config = db_configs.get(name)
result.append(
{
"name": name,
"default_url": yaml_url,
"endpoint": db_config.endpoint if db_config else yaml_url,
"is_overridden": db_config is not None and db_config.endpoint != yaml_url
if yaml_url
else db_config is not None,
"is_active": db_config.is_active if db_config else True,
"source_type": db_config.source_type if db_config else "http",
"description": db_config.description
if db_config
else f"Data source from YAML: {yaml_key}",
}
)
return {"total": len(result), "data": result}