"""BGP anomaly model for derived routing intelligence.""" from datetime import datetime from sqlalchemy import Column, DateTime, Float, ForeignKey, Index, Integer, JSON, String, Text from app.core.time import to_iso8601_utc from app.db.session import Base class BGPAnomaly(Base): __tablename__ = "bgp_anomalies" id = Column(Integer, primary_key=True, index=True) snapshot_id = Column(Integer, ForeignKey("data_snapshots.id"), nullable=True, index=True) task_id = Column(Integer, ForeignKey("collection_tasks.id"), nullable=True, index=True) source = Column(String(100), nullable=False, index=True) anomaly_type = Column(String(50), nullable=False, index=True) severity = Column(String(20), nullable=False, index=True) status = Column(String(20), nullable=False, default="active", index=True) entity_key = Column(String(255), nullable=False, index=True) prefix = Column(String(64), nullable=True, index=True) origin_asn = Column(Integer, nullable=True, index=True) new_origin_asn = Column(Integer, nullable=True, index=True) peer_scope = Column(JSON, default=list) started_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True) ended_at = Column(DateTime(timezone=True), nullable=True) confidence = Column(Float, nullable=False, default=0.5) summary = Column(Text, nullable=False) evidence = Column(JSON, default=dict) created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow, index=True) __table_args__ = ( Index("idx_bgp_anomalies_source_created", "source", "created_at"), Index("idx_bgp_anomalies_type_status", "anomaly_type", "status"), ) def to_dict(self) -> dict: return { "id": self.id, "snapshot_id": self.snapshot_id, "task_id": self.task_id, "source": self.source, "anomaly_type": self.anomaly_type, "severity": self.severity, "status": self.status, "entity_key": self.entity_key, "prefix": self.prefix, "origin_asn": self.origin_asn, "new_origin_asn": self.new_origin_asn, "peer_scope": self.peer_scope or [], "started_at": to_iso8601_utc(self.started_at), "ended_at": to_iso8601_utc(self.ended_at), "confidence": self.confidence, "summary": self.summary, "evidence": self.evidence or {}, "created_at": to_iso8601_utc(self.created_at), }