"""DataSourceConfig API for user-defined data sources""" from typing import Optional from datetime import datetime import base64 from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel, Field import httpx from app.db.session import get_db from app.models.user import User from app.models.datasource_config import DataSourceConfig from app.core.security import get_current_user from app.core.cache import cache router = APIRouter() class DataSourceConfigCreate(BaseModel): name: str = Field(..., min_length=1, max_length=100) description: Optional[str] = None source_type: str = Field(..., description="http, api, database") endpoint: str = Field(..., max_length=500) auth_type: str = Field(default="none", description="none, bearer, api_key, basic") auth_config: dict = Field(default={}) headers: dict = Field(default={}) config: dict = Field(default={"timeout": 30, "retry": 3}) class DataSourceConfigUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=100) description: Optional[str] = None source_type: Optional[str] = None endpoint: Optional[str] = Field(None, max_length=500) auth_type: Optional[str] = None auth_config: Optional[dict] = None headers: Optional[dict] = None config: Optional[dict] = None is_active: Optional[bool] = None class DataSourceConfigResponse(BaseModel): id: int name: str description: Optional[str] source_type: str endpoint: str auth_type: str headers: dict config: dict is_active: bool created_at: datetime updated_at: datetime class Config: from_attributes = True async def test_endpoint( endpoint: str, auth_type: str, auth_config: dict, headers: dict, config: dict, ) -> dict: """Test an endpoint connection""" timeout = config.get("timeout", 30) test_headers = headers.copy() # Add auth headers if auth_type == "bearer" and auth_config.get("token"): test_headers["Authorization"] = f"Bearer {auth_config['token']}" elif auth_type == "api_key" and auth_config.get("api_key"): key_name = auth_config.get("key_name", "X-API-Key") test_headers[key_name] = auth_config["api_key"] elif auth_type == "basic": username = auth_config.get("username", "") password = auth_config.get("password", "") credentials = f"{username}:{password}" encoded = base64.b64encode(credentials.encode()).decode() test_headers["Authorization"] = f"Basic {encoded}" async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get(endpoint, headers=test_headers) response.raise_for_status() return { "status_code": response.status_code, "success": True, "response_time_ms": response.elapsed.total_seconds() * 1000, "data_preview": str(response.json()[:3]) if response.headers.get("content-type", "").startswith("application/json") else response.text[:200], } @router.get("/configs") async def list_configs( active_only: bool = False, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """List all user-defined data source configurations""" query = select(DataSourceConfig) if active_only: query = query.where(DataSourceConfig.is_active == True) query = query.order_by(DataSourceConfig.created_at.desc()) result = await db.execute(query) configs = result.scalars().all() return { "total": len(configs), "data": [ { "id": c.id, "name": c.name, "description": c.description, "source_type": c.source_type, "endpoint": c.endpoint, "auth_type": c.auth_type, "headers": c.headers, "config": c.config, "is_active": c.is_active, "created_at": c.created_at.isoformat() if c.created_at else None, "updated_at": c.updated_at.isoformat() if c.updated_at else None, } for c in configs ], } @router.get("/configs/{config_id}") async def get_config( config_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Get a single data source configuration""" result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id)) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Configuration not found") return { "id": config.id, "name": config.name, "description": config.description, "source_type": config.source_type, "endpoint": config.endpoint, "auth_type": config.auth_type, "auth_config": {}, # Don't return sensitive data "headers": config.headers, "config": config.config, "is_active": config.is_active, "created_at": config.created_at.isoformat() if config.created_at else None, "updated_at": config.updated_at.isoformat() if config.updated_at else None, } @router.post("/configs") async def create_config( config_data: DataSourceConfigCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Create a new data source configuration""" config = DataSourceConfig( name=config_data.name, description=config_data.description, source_type=config_data.source_type, endpoint=config_data.endpoint, auth_type=config_data.auth_type, auth_config=config_data.auth_config, headers=config_data.headers, config=config_data.config, ) db.add(config) await db.commit() await db.refresh(config) cache.delete_pattern("datasource_configs:*") return { "id": config.id, "name": config.name, "message": "Configuration created successfully", } @router.put("/configs/{config_id}") async def update_config( config_id: int, config_data: DataSourceConfigUpdate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Update a data source configuration""" result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id)) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Configuration not found") update_data = config_data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(config, field, value) await db.commit() await db.refresh(config) cache.delete_pattern("datasource_configs:*") return { "id": config.id, "name": config.name, "message": "Configuration updated successfully", } @router.delete("/configs/{config_id}") async def delete_config( config_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Delete a data source configuration""" result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id)) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Configuration not found") await db.delete(config) await db.commit() cache.delete_pattern("datasource_configs:*") return {"message": "Configuration deleted successfully"} @router.post("/configs/{config_id}/test") async def test_config( config_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """Test a data source configuration""" result = await db.execute(select(DataSourceConfig).where(DataSourceConfig.id == config_id)) config = result.scalar_one_or_none() if not config: raise HTTPException(status_code=404, detail="Configuration not found") try: result = await test_endpoint( endpoint=config.endpoint, auth_type=config.auth_type, auth_config=config.auth_config or {}, headers=config.headers or {}, config=config.config or {}, ) return result except httpx.HTTPStatusError as e: return { "success": False, "error": f"HTTP Error: {e.response.status_code}", "message": str(e), } except Exception as e: return { "success": False, "error": "Connection failed", "message": str(e), } @router.post("/configs/test") async def test_new_config( config_data: DataSourceConfigCreate, current_user: User = Depends(get_current_user), ): """Test a new data source configuration without saving""" try: result = await test_endpoint( endpoint=config_data.endpoint, auth_type=config_data.auth_type, auth_config=config_data.auth_config or {}, headers=config_data.headers or {}, config=config_data.config or {}, ) return result except httpx.HTTPStatusError as e: return { "success": False, "error": f"HTTP Error: {e.response.status_code}", "message": str(e), } except Exception as e: return { "success": False, "error": "Connection failed", "message": str(e), } @router.get("/configs/all") async def list_all_datasources( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """List all data sources: YAML defaults + DB overrides""" from app.core.data_sources import COLLECTOR_URL_KEYS, get_data_sources_config config = get_data_sources_config() db_query = await db.execute(select(DataSourceConfig)) db_configs = {c.name: c for c in db_query.scalars().all()} result = [] for name, yaml_key in COLLECTOR_URL_KEYS.items(): yaml_url = config.get_yaml_url(name) db_config = db_configs.get(name) result.append( { "name": name, "default_url": yaml_url, "endpoint": db_config.endpoint if db_config else yaml_url, "is_overridden": db_config is not None and db_config.endpoint != yaml_url if yaml_url else db_config is not None, "is_active": db_config.is_active if db_config else True, "source_type": db_config.source_type if db_config else "http", "description": db_config.description if db_config else f"Data source from YAML: {yaml_key}", } ) return {"total": len(result), "data": result}