Files
planet/backend/app/models/bgp_anomaly.py

59 lines
2.5 KiB
Python

"""BGP anomaly model for derived routing intelligence."""
from datetime import datetime
from sqlalchemy import Column, DateTime, Float, ForeignKey, Index, Integer, JSON, String, Text
from app.core.time import to_iso8601_utc
from app.db.session import Base
class BGPAnomaly(Base):
__tablename__ = "bgp_anomalies"
id = Column(Integer, primary_key=True, index=True)
snapshot_id = Column(Integer, ForeignKey("data_snapshots.id"), nullable=True, index=True)
task_id = Column(Integer, ForeignKey("collection_tasks.id"), nullable=True, index=True)
source = Column(String(100), nullable=False, index=True)
anomaly_type = Column(String(50), nullable=False, index=True)
severity = Column(String(20), nullable=False, index=True)
status = Column(String(20), nullable=False, default="active", index=True)
entity_key = Column(String(255), nullable=False, index=True)
prefix = Column(String(64), nullable=True, index=True)
origin_asn = Column(Integer, nullable=True, index=True)
new_origin_asn = Column(Integer, nullable=True, index=True)
peer_scope = Column(JSON, default=list)
started_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True)
ended_at = Column(DateTime(timezone=True), nullable=True)
confidence = Column(Float, nullable=False, default=0.5)
summary = Column(Text, nullable=False)
evidence = Column(JSON, default=dict)
created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True)
__table_args__ = (
Index("idx_bgp_anomalies_source_created", "source", "created_at"),
Index("idx_bgp_anomalies_type_status", "anomaly_type", "status"),
)
def to_dict(self) -> dict:
return {
"id": self.id,
"snapshot_id": self.snapshot_id,
"task_id": self.task_id,
"source": self.source,
"anomaly_type": self.anomaly_type,
"severity": self.severity,
"status": self.status,
"entity_key": self.entity_key,
"prefix": self.prefix,
"origin_asn": self.origin_asn,
"new_origin_asn": self.new_origin_asn,
"peer_scope": self.peer_scope or [],
"started_at": to_iso8601_utc(self.started_at),
"ended_at": to_iso8601_utc(self.ended_at),
"confidence": self.confidence,
"summary": self.summary,
"evidence": self.evidence or {},
"created_at": to_iso8601_utc(self.created_at),
}