183 lines
6.1 KiB
Python
183 lines
6.1 KiB
Python
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.security import get_current_user
|
|
from app.db.session import get_db
|
|
from app.models.bgp_anomaly import BGPAnomaly
|
|
from app.models.collected_data import CollectedData
|
|
from app.models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
BGP_SOURCES = ("ris_live_bgp", "bgpstream_bgp")
|
|
|
|
|
|
def _parse_dt(value: Optional[str]) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
|
|
|
|
def _matches_time(value: Optional[datetime], time_from: Optional[datetime], time_to: Optional[datetime]) -> bool:
|
|
if value is None:
|
|
return False
|
|
if time_from and value < time_from:
|
|
return False
|
|
if time_to and value > time_to:
|
|
return False
|
|
return True
|
|
|
|
|
|
@router.get("/events")
|
|
async def list_bgp_events(
|
|
prefix: Optional[str] = Query(None),
|
|
origin_asn: Optional[int] = Query(None),
|
|
peer_asn: Optional[int] = Query(None),
|
|
collector: Optional[str] = Query(None),
|
|
event_type: Optional[str] = Query(None),
|
|
source: Optional[str] = Query(None),
|
|
time_from: Optional[str] = Query(None),
|
|
time_to: Optional[str] = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
stmt = (
|
|
select(CollectedData)
|
|
.where(CollectedData.source.in_(BGP_SOURCES))
|
|
.order_by(CollectedData.reference_date.desc().nullslast(), CollectedData.id.desc())
|
|
)
|
|
if source:
|
|
stmt = stmt.where(CollectedData.source == source)
|
|
|
|
result = await db.execute(stmt)
|
|
records = result.scalars().all()
|
|
dt_from = _parse_dt(time_from)
|
|
dt_to = _parse_dt(time_to)
|
|
|
|
filtered = []
|
|
for record in records:
|
|
metadata = record.extra_data or {}
|
|
if prefix and metadata.get("prefix") != prefix:
|
|
continue
|
|
if origin_asn is not None and metadata.get("origin_asn") != origin_asn:
|
|
continue
|
|
if peer_asn is not None and metadata.get("peer_asn") != peer_asn:
|
|
continue
|
|
if collector and metadata.get("collector") != collector:
|
|
continue
|
|
if event_type and metadata.get("event_type") != event_type:
|
|
continue
|
|
if (dt_from or dt_to) and not _matches_time(record.reference_date, dt_from, dt_to):
|
|
continue
|
|
filtered.append(record)
|
|
|
|
offset = (page - 1) * page_size
|
|
return {
|
|
"total": len(filtered),
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"data": [record.to_dict() for record in filtered[offset : offset + page_size]],
|
|
}
|
|
|
|
|
|
@router.get("/events/{event_id}")
|
|
async def get_bgp_event(
|
|
event_id: int,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
record = await db.get(CollectedData, event_id)
|
|
if not record or record.source not in BGP_SOURCES:
|
|
raise HTTPException(status_code=404, detail="BGP event not found")
|
|
return record.to_dict()
|
|
|
|
|
|
@router.get("/anomalies")
|
|
async def list_bgp_anomalies(
|
|
severity: Optional[str] = Query(None),
|
|
anomaly_type: Optional[str] = Query(None),
|
|
status: Optional[str] = Query(None),
|
|
prefix: Optional[str] = Query(None),
|
|
origin_asn: Optional[int] = Query(None),
|
|
time_from: Optional[str] = Query(None),
|
|
time_to: Optional[str] = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
stmt = select(BGPAnomaly).order_by(BGPAnomaly.created_at.desc(), BGPAnomaly.id.desc())
|
|
if severity:
|
|
stmt = stmt.where(BGPAnomaly.severity == severity)
|
|
if anomaly_type:
|
|
stmt = stmt.where(BGPAnomaly.anomaly_type == anomaly_type)
|
|
if status:
|
|
stmt = stmt.where(BGPAnomaly.status == status)
|
|
if prefix:
|
|
stmt = stmt.where(BGPAnomaly.prefix == prefix)
|
|
if origin_asn is not None:
|
|
stmt = stmt.where(BGPAnomaly.origin_asn == origin_asn)
|
|
|
|
result = await db.execute(stmt)
|
|
records = result.scalars().all()
|
|
dt_from = _parse_dt(time_from)
|
|
dt_to = _parse_dt(time_to)
|
|
if dt_from or dt_to:
|
|
records = [record for record in records if _matches_time(record.created_at, dt_from, dt_to)]
|
|
|
|
offset = (page - 1) * page_size
|
|
return {
|
|
"total": len(records),
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"data": [record.to_dict() for record in records[offset : offset + page_size]],
|
|
}
|
|
|
|
|
|
@router.get("/anomalies/summary")
|
|
async def get_bgp_anomaly_summary(
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
total_result = await db.execute(select(func.count(BGPAnomaly.id)))
|
|
type_result = await db.execute(
|
|
select(BGPAnomaly.anomaly_type, func.count(BGPAnomaly.id))
|
|
.group_by(BGPAnomaly.anomaly_type)
|
|
.order_by(func.count(BGPAnomaly.id).desc())
|
|
)
|
|
severity_result = await db.execute(
|
|
select(BGPAnomaly.severity, func.count(BGPAnomaly.id))
|
|
.group_by(BGPAnomaly.severity)
|
|
.order_by(func.count(BGPAnomaly.id).desc())
|
|
)
|
|
status_result = await db.execute(
|
|
select(BGPAnomaly.status, func.count(BGPAnomaly.id))
|
|
.group_by(BGPAnomaly.status)
|
|
.order_by(func.count(BGPAnomaly.id).desc())
|
|
)
|
|
|
|
return {
|
|
"total": total_result.scalar() or 0,
|
|
"by_type": {row[0]: row[1] for row in type_result.fetchall()},
|
|
"by_severity": {row[0]: row[1] for row in severity_result.fetchall()},
|
|
"by_status": {row[0]: row[1] for row in status_result.fetchall()},
|
|
}
|
|
|
|
|
|
@router.get("/anomalies/{anomaly_id}")
|
|
async def get_bgp_anomaly(
|
|
anomaly_id: int,
|
|
current_user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
record = await db.get(BGPAnomaly, anomaly_id)
|
|
if not record:
|
|
raise HTTPException(status_code=404, detail="BGP anomaly not found")
|
|
return record.to_dict()
|