from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, status, Response from fastapi.responses import StreamingResponse from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession import json import csv import io from app.db.session import get_db from app.models.user import User from app.core.security import get_current_user from app.models.collected_data import CollectedData router = APIRouter() @router.get("") async def list_collected_data( source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), search: Optional[str] = Query(None, description="搜索名称"), page: int = Query(1, ge=1, description="页码"), page_size: int = Query(20, ge=1, le=100, description="每页数量"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """查询采集的数据列表""" # Build WHERE clause conditions = [] params = {} if source: conditions.append("source = :source") params["source"] = source if data_type: conditions.append("data_type = :data_type") params["data_type"] = data_type if country: conditions.append("country = :country") params["country"] = country if search: conditions.append("(name ILIKE :search OR title ILIKE :search)") params["search"] = f"%{search}%" where_sql = " AND ".join(conditions) if conditions else "1=1" # Calculate offset offset = (page - 1) * page_size # Query total count count_query = text(f"SELECT COUNT(*) FROM collected_data WHERE {where_sql}") count_result = await db.execute(count_query, params) total = count_result.scalar() # Query data query = text(f""" SELECT id, source, source_id, data_type, name, title, description, country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE {where_sql} ORDER BY collected_at DESC LIMIT :limit OFFSET :offset """) params["limit"] = page_size params["offset"] = offset result = await db.execute(query, params) rows = result.fetchall() data = [] for row in rows: data.append( { "id": row[0], "source": row[1], "source_id": row[2], "data_type": row[3], "name": row[4], "title": row[5], "description": row[6], "country": row[7], "city": row[8], "latitude": row[9], "longitude": row[10], "value": row[11], "unit": row[12], "metadata": row[13], "collected_at": row[14].isoformat() if row[14] else None, "reference_date": row[15].isoformat() if row[15] else None, "is_valid": row[16], } ) return { "total": total, "page": page, "page_size": page_size, "data": data, } @router.get("/summary") async def get_data_summary( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取数据汇总统计""" # By source and data_type result = await db.execute( text(""" SELECT source, data_type, COUNT(*) as count FROM collected_data GROUP BY source, data_type ORDER BY source, data_type """) ) rows = result.fetchall() by_source = {} total = 0 for row in rows: source = row[0] data_type = row[1] count = row[2] if source not in by_source: by_source[source] = {} by_source[source][data_type] = count total += count # Total by source source_totals = await db.execute( text(""" SELECT source, COUNT(*) as count FROM collected_data GROUP BY source ORDER BY count DESC """) ) source_rows = source_totals.fetchall() return { "total_records": total, "by_source": by_source, "source_totals": [{"source": row[0], "count": row[1]} for row in source_rows], } @router.get("/sources") async def get_data_sources( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取所有数据源列表""" result = await db.execute( text(""" SELECT DISTINCT source FROM collected_data ORDER BY source """) ) rows = result.fetchall() return { "sources": [row[0] for row in rows], } @router.get("/types") async def get_data_types( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取所有数据类型列表""" result = await db.execute( text(""" SELECT DISTINCT data_type FROM collected_data ORDER BY data_type """) ) rows = result.fetchall() return { "data_types": [row[0] for row in rows], } @router.get("/countries") async def get_countries( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取所有国家列表""" result = await db.execute( text(""" SELECT DISTINCT country FROM collected_data WHERE country IS NOT NULL AND country != '' ORDER BY country """) ) rows = result.fetchall() return { "countries": [row[0] for row in rows], } @router.get("/{data_id}") async def get_collected_data( data_id: int, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """获取单条采集数据详情""" result = await db.execute( text(""" SELECT id, source, source_id, data_type, name, title, description, country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE id = :id """), {"id": data_id}, ) row = result.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="数据不存在", ) return { "id": row[0], "source": row[1], "source_id": row[2], "data_type": row[3], "name": row[4], "title": row[5], "description": row[6], "country": row[7], "city": row[8], "latitude": row[9], "longitude": row[10], "value": row[11], "unit": row[12], "metadata": row[13], "collected_at": row[14].isoformat() if row[14] else None, "reference_date": row[15].isoformat() if row[15] else None, "is_valid": row[16], } def build_where_clause( source: Optional[str], data_type: Optional[str], country: Optional[str], search: Optional[str] ): """Build WHERE clause and params for queries""" conditions = [] params = {} if source: conditions.append("source = :source") params["source"] = source if data_type: conditions.append("data_type = :data_type") params["data_type"] = data_type if country: conditions.append("country = :country") params["country"] = country if search: conditions.append("(name ILIKE :search OR title ILIKE :search)") params["search"] = f"%{search}%" where_sql = " AND ".join(conditions) if conditions else "1=1" return where_sql, params @router.get("/export/json") async def export_json( source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), search: Optional[str] = Query(None, description="搜索名称"), limit: int = Query(10000, ge=1, le=50000, description="最大导出数量"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """导出数据为 JSON 格式""" where_sql, params = build_where_clause(source, data_type, country, search) params["limit"] = limit query = text(f""" SELECT id, source, source_id, data_type, name, title, description, country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE {where_sql} ORDER BY collected_at DESC LIMIT :limit """) result = await db.execute(query, params) rows = result.fetchall() data = [] for row in rows: data.append( { "id": row[0], "source": row[1], "source_id": row[2], "data_type": row[3], "name": row[4], "title": row[5], "description": row[6], "country": row[7], "city": row[8], "latitude": row[9], "longitude": row[10], "value": row[11], "unit": row[12], "metadata": row[13], "collected_at": row[14].isoformat() if row[14] else None, "reference_date": row[15].isoformat() if row[15] else None, "is_valid": row[16], } ) json_str = json.dumps({"data": data, "total": len(data)}, ensure_ascii=False, indent=2) return StreamingResponse( io.StringIO(json_str), media_type="application/json", headers={ "Content-Disposition": f"attachment; filename=collected_data_{source or 'all'}.json" }, ) @router.get("/export/csv") async def export_csv( source: Optional[str] = Query(None, description="数据源过滤"), data_type: Optional[str] = Query(None, description="数据类型过滤"), country: Optional[str] = Query(None, description="国家过滤"), search: Optional[str] = Query(None, description="搜索名称"), limit: int = Query(10000, ge=1, le=50000, description="最大导出数量"), current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """导出数据为 CSV 格式""" where_sql, params = build_where_clause(source, data_type, country, search) params["limit"] = limit query = text(f""" SELECT id, source, source_id, data_type, name, title, description, country, city, latitude, longitude, value, unit, metadata, collected_at, reference_date, is_valid FROM collected_data WHERE {where_sql} ORDER BY collected_at DESC LIMIT :limit """) result = await db.execute(query, params) rows = result.fetchall() output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow( [ "ID", "Source", "Source ID", "Type", "Name", "Title", "Description", "Country", "City", "Latitude", "Longitude", "Value", "Unit", "Metadata", "Collected At", "Reference Date", "Is Valid", ] ) # Write data for row in rows: writer.writerow( [ row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], json.dumps(row[13]) if row[13] else "", row[14].isoformat() if row[14] else "", row[15].isoformat() if row[15] else "", row[16], ] ) return StreamingResponse( io.StringIO(output.getvalue()), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=collected_data_{source or 'all'}.csv" }, )