Files
planet/backend/tests/test_bgp.py

75 lines
2.5 KiB
Python

"""Tests for BGP observability helpers."""
from app.models.bgp_anomaly import BGPAnomaly
from app.services.collectors.bgp_common import normalize_bgp_event
from app.services.collectors.bgpstream import BGPStreamBackfillCollector
def test_normalize_bgp_event_from_live_payload():
event = normalize_bgp_event(
{
"collector": "rrc00",
"peer_asn": "3333",
"peer_ip": "2001:db8::1",
"type": "UPDATE",
"event_type": "announcement",
"prefix": "203.0.113.0/24",
"path": ["3333", "64500", "64496"],
"communities": ["3333:100"],
"timestamp": "2026-03-26T08:00:00Z",
},
project="ris-live",
)
assert event["name"] == "203.0.113.0/24"
assert event["metadata"]["collector"] == "rrc00"
assert event["metadata"]["peer_asn"] == 3333
assert event["metadata"]["origin_asn"] == 64496
assert event["metadata"]["as_path_length"] == 3
assert event["metadata"]["prefix_length"] == 24
assert event["metadata"]["is_more_specific"] is False
def test_bgpstream_transform_preserves_broker_record():
collector = BGPStreamBackfillCollector()
transformed = collector.transform(
[
{
"project": "routeviews",
"collector": "route-views.sg",
"filename": "rib.20260326.0800.gz",
"startTime": "2026-03-26T08:00:00Z",
"prefix": "198.51.100.0/24",
"origin_asn": 64512,
}
]
)
assert len(transformed) == 1
record = transformed[0]
assert record["name"] == "rib.20260326.0800.gz"
assert record["metadata"]["project"] == "bgpstream"
assert record["metadata"]["broker_record"]["filename"] == "rib.20260326.0800.gz"
def test_bgp_anomaly_to_dict():
anomaly = BGPAnomaly(
source="ris_live_bgp",
anomaly_type="origin_change",
severity="critical",
status="active",
entity_key="origin_change:203.0.113.0/24:64497",
prefix="203.0.113.0/24",
origin_asn=64496,
new_origin_asn=64497,
summary="Origin ASN changed",
confidence=0.9,
evidence={"previous_origins": [64496], "current_origins": [64497]},
)
data = anomaly.to_dict()
assert data["source"] == "ris_live_bgp"
assert data["anomaly_type"] == "origin_change"
assert data["new_origin_asn"] == 64497
assert data["evidence"]["previous_origins"] == [64496]