432 lines
12 KiB
Python
432 lines
12 KiB
Python
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status, Response
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy import select, func, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
import json
|
|
import csv
|
|
import io
|
|
|
|
from app.db.session import get_db
|
|
from app.models.user import User
|
|
from app.core.security import get_current_user
|
|
from app.models.collected_data import CollectedData
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("")
|
|
async def list_collected_data(
|
|
source: Optional[str] = Query(None, description="数据源过滤"),
|
|
data_type: Optional[str] = Query(None, description="数据类型过滤"),
|
|
country: Optional[str] = Query(None, description="国家过滤"),
|
|
search: Optional[str] = Query(None, description="搜索名称"),
|
|
page: int = Query(1, ge=1, description="页码"),
|
|
page_size: int = Query(20, ge=1, le=100, description="每页数量"),
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""查询采集的数据列表"""
|
|
|
|
# Build WHERE clause
|
|
conditions = []
|
|
params = {}
|
|
|
|
if source:
|
|
conditions.append("source = :source")
|
|
params["source"] = source
|
|
if data_type:
|
|
conditions.append("data_type = :data_type")
|
|
params["data_type"] = data_type
|
|
if country:
|
|
conditions.append("country = :country")
|
|
params["country"] = country
|
|
if search:
|
|
conditions.append("(name ILIKE :search OR title ILIKE :search)")
|
|
params["search"] = f"%{search}%"
|
|
|
|
where_sql = " AND ".join(conditions) if conditions else "1=1"
|
|
|
|
# Calculate offset
|
|
offset = (page - 1) * page_size
|
|
|
|
# Query total count
|
|
count_query = text(f"SELECT COUNT(*) FROM collected_data WHERE {where_sql}")
|
|
count_result = await db.execute(count_query, params)
|
|
total = count_result.scalar()
|
|
|
|
# Query data
|
|
query = text(f"""
|
|
SELECT id, source, source_id, data_type, name, title, description,
|
|
country, city, latitude, longitude, value, unit,
|
|
metadata, collected_at, reference_date, is_valid
|
|
FROM collected_data
|
|
WHERE {where_sql}
|
|
ORDER BY collected_at DESC
|
|
LIMIT :limit OFFSET :offset
|
|
""")
|
|
params["limit"] = page_size
|
|
params["offset"] = offset
|
|
|
|
result = await db.execute(query, params)
|
|
rows = result.fetchall()
|
|
|
|
data = []
|
|
for row in rows:
|
|
data.append(
|
|
{
|
|
"id": row[0],
|
|
"source": row[1],
|
|
"source_id": row[2],
|
|
"data_type": row[3],
|
|
"name": row[4],
|
|
"title": row[5],
|
|
"description": row[6],
|
|
"country": row[7],
|
|
"city": row[8],
|
|
"latitude": row[9],
|
|
"longitude": row[10],
|
|
"value": row[11],
|
|
"unit": row[12],
|
|
"metadata": row[13],
|
|
"collected_at": row[14].isoformat() if row[14] else None,
|
|
"reference_date": row[15].isoformat() if row[15] else None,
|
|
"is_valid": row[16],
|
|
}
|
|
)
|
|
|
|
return {
|
|
"total": total,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"data": data,
|
|
}
|
|
|
|
|
|
@router.get("/summary")
|
|
async def get_data_summary(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""获取数据汇总统计"""
|
|
|
|
# By source and data_type
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT source, data_type, COUNT(*) as count
|
|
FROM collected_data
|
|
GROUP BY source, data_type
|
|
ORDER BY source, data_type
|
|
""")
|
|
)
|
|
rows = result.fetchall()
|
|
|
|
by_source = {}
|
|
total = 0
|
|
for row in rows:
|
|
source = row[0]
|
|
data_type = row[1]
|
|
count = row[2]
|
|
|
|
if source not in by_source:
|
|
by_source[source] = {}
|
|
by_source[source][data_type] = count
|
|
total += count
|
|
|
|
# Total by source
|
|
source_totals = await db.execute(
|
|
text("""
|
|
SELECT source, COUNT(*) as count
|
|
FROM collected_data
|
|
GROUP BY source
|
|
ORDER BY count DESC
|
|
""")
|
|
)
|
|
source_rows = source_totals.fetchall()
|
|
|
|
return {
|
|
"total_records": total,
|
|
"by_source": by_source,
|
|
"source_totals": [{"source": row[0], "count": row[1]} for row in source_rows],
|
|
}
|
|
|
|
|
|
@router.get("/sources")
|
|
async def get_data_sources(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""获取所有数据源列表"""
|
|
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT DISTINCT source FROM collected_data ORDER BY source
|
|
""")
|
|
)
|
|
rows = result.fetchall()
|
|
|
|
return {
|
|
"sources": [row[0] for row in rows],
|
|
}
|
|
|
|
|
|
@router.get("/types")
|
|
async def get_data_types(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""获取所有数据类型列表"""
|
|
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT DISTINCT data_type FROM collected_data ORDER BY data_type
|
|
""")
|
|
)
|
|
rows = result.fetchall()
|
|
|
|
return {
|
|
"data_types": [row[0] for row in rows],
|
|
}
|
|
|
|
|
|
@router.get("/countries")
|
|
async def get_countries(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""获取所有国家列表"""
|
|
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT DISTINCT country FROM collected_data
|
|
WHERE country IS NOT NULL AND country != ''
|
|
ORDER BY country
|
|
""")
|
|
)
|
|
rows = result.fetchall()
|
|
|
|
return {
|
|
"countries": [row[0] for row in rows],
|
|
}
|
|
|
|
|
|
@router.get("/{data_id}")
|
|
async def get_collected_data(
|
|
data_id: int,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""获取单条采集数据详情"""
|
|
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT id, source, source_id, data_type, name, title, description,
|
|
country, city, latitude, longitude, value, unit,
|
|
metadata, collected_at, reference_date, is_valid
|
|
FROM collected_data
|
|
WHERE id = :id
|
|
"""),
|
|
{"id": data_id},
|
|
)
|
|
row = result.fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="数据不存在",
|
|
)
|
|
|
|
return {
|
|
"id": row[0],
|
|
"source": row[1],
|
|
"source_id": row[2],
|
|
"data_type": row[3],
|
|
"name": row[4],
|
|
"title": row[5],
|
|
"description": row[6],
|
|
"country": row[7],
|
|
"city": row[8],
|
|
"latitude": row[9],
|
|
"longitude": row[10],
|
|
"value": row[11],
|
|
"unit": row[12],
|
|
"metadata": row[13],
|
|
"collected_at": row[14].isoformat() if row[14] else None,
|
|
"reference_date": row[15].isoformat() if row[15] else None,
|
|
"is_valid": row[16],
|
|
}
|
|
|
|
|
|
def build_where_clause(
|
|
source: Optional[str], data_type: Optional[str], country: Optional[str], search: Optional[str]
|
|
):
|
|
"""Build WHERE clause and params for queries"""
|
|
conditions = []
|
|
params = {}
|
|
|
|
if source:
|
|
conditions.append("source = :source")
|
|
params["source"] = source
|
|
if data_type:
|
|
conditions.append("data_type = :data_type")
|
|
params["data_type"] = data_type
|
|
if country:
|
|
conditions.append("country = :country")
|
|
params["country"] = country
|
|
if search:
|
|
conditions.append("(name ILIKE :search OR title ILIKE :search)")
|
|
params["search"] = f"%{search}%"
|
|
|
|
where_sql = " AND ".join(conditions) if conditions else "1=1"
|
|
return where_sql, params
|
|
|
|
|
|
@router.get("/export/json")
|
|
async def export_json(
|
|
source: Optional[str] = Query(None, description="数据源过滤"),
|
|
data_type: Optional[str] = Query(None, description="数据类型过滤"),
|
|
country: Optional[str] = Query(None, description="国家过滤"),
|
|
search: Optional[str] = Query(None, description="搜索名称"),
|
|
limit: int = Query(10000, ge=1, le=50000, description="最大导出数量"),
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""导出数据为 JSON 格式"""
|
|
|
|
where_sql, params = build_where_clause(source, data_type, country, search)
|
|
params["limit"] = limit
|
|
|
|
query = text(f"""
|
|
SELECT id, source, source_id, data_type, name, title, description,
|
|
country, city, latitude, longitude, value, unit,
|
|
metadata, collected_at, reference_date, is_valid
|
|
FROM collected_data
|
|
WHERE {where_sql}
|
|
ORDER BY collected_at DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
result = await db.execute(query, params)
|
|
rows = result.fetchall()
|
|
|
|
data = []
|
|
for row in rows:
|
|
data.append(
|
|
{
|
|
"id": row[0],
|
|
"source": row[1],
|
|
"source_id": row[2],
|
|
"data_type": row[3],
|
|
"name": row[4],
|
|
"title": row[5],
|
|
"description": row[6],
|
|
"country": row[7],
|
|
"city": row[8],
|
|
"latitude": row[9],
|
|
"longitude": row[10],
|
|
"value": row[11],
|
|
"unit": row[12],
|
|
"metadata": row[13],
|
|
"collected_at": row[14].isoformat() if row[14] else None,
|
|
"reference_date": row[15].isoformat() if row[15] else None,
|
|
"is_valid": row[16],
|
|
}
|
|
)
|
|
|
|
json_str = json.dumps({"data": data, "total": len(data)}, ensure_ascii=False, indent=2)
|
|
|
|
return StreamingResponse(
|
|
io.StringIO(json_str),
|
|
media_type="application/json",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=collected_data_{source or 'all'}.json"
|
|
},
|
|
)
|
|
|
|
|
|
@router.get("/export/csv")
|
|
async def export_csv(
|
|
source: Optional[str] = Query(None, description="数据源过滤"),
|
|
data_type: Optional[str] = Query(None, description="数据类型过滤"),
|
|
country: Optional[str] = Query(None, description="国家过滤"),
|
|
search: Optional[str] = Query(None, description="搜索名称"),
|
|
limit: int = Query(10000, ge=1, le=50000, description="最大导出数量"),
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""导出数据为 CSV 格式"""
|
|
|
|
where_sql, params = build_where_clause(source, data_type, country, search)
|
|
params["limit"] = limit
|
|
|
|
query = text(f"""
|
|
SELECT id, source, source_id, data_type, name, title, description,
|
|
country, city, latitude, longitude, value, unit,
|
|
metadata, collected_at, reference_date, is_valid
|
|
FROM collected_data
|
|
WHERE {where_sql}
|
|
ORDER BY collected_at DESC
|
|
LIMIT :limit
|
|
""")
|
|
|
|
result = await db.execute(query, params)
|
|
rows = result.fetchall()
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# Write header
|
|
writer.writerow(
|
|
[
|
|
"ID",
|
|
"Source",
|
|
"Source ID",
|
|
"Type",
|
|
"Name",
|
|
"Title",
|
|
"Description",
|
|
"Country",
|
|
"City",
|
|
"Latitude",
|
|
"Longitude",
|
|
"Value",
|
|
"Unit",
|
|
"Metadata",
|
|
"Collected At",
|
|
"Reference Date",
|
|
"Is Valid",
|
|
]
|
|
)
|
|
|
|
# Write data
|
|
for row in rows:
|
|
writer.writerow(
|
|
[
|
|
row[0],
|
|
row[1],
|
|
row[2],
|
|
row[3],
|
|
row[4],
|
|
row[5],
|
|
row[6],
|
|
row[7],
|
|
row[8],
|
|
row[9],
|
|
row[10],
|
|
row[11],
|
|
row[12],
|
|
json.dumps(row[13]) if row[13] else "",
|
|
row[14].isoformat() if row[14] else "",
|
|
row[15].isoformat() if row[15] else "",
|
|
row[16],
|
|
]
|
|
)
|
|
|
|
return StreamingResponse(
|
|
io.StringIO(output.getvalue()),
|
|
media_type="text/csv",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename=collected_data_{source or 'all'}.csv"
|
|
},
|
|
)
|