Stabilize Earth module and fix satellite TLE handling

This commit is contained in:
linkong
2026-03-26 10:29:50 +08:00
parent 3fd6cbb6f7
commit ce5feba3b9
14 changed files with 2132 additions and 1069 deletions

View File

@@ -11,6 +11,7 @@ from sqlalchemy import select, func
from typing import List, Dict, Any, Optional
from app.core.collected_data_fields import get_record_field
from app.core.satellite_tle import build_tle_lines_from_elements
from app.db.session import get_db
from app.models.collected_data import CollectedData
from app.services.cable_graph import build_graph_from_data, CableGraph
@@ -155,6 +156,20 @@ def convert_satellite_to_geojson(records: List[CollectedData]) -> Dict[str, Any]
if not norad_id:
continue
tle_line1 = metadata.get("tle_line1")
tle_line2 = metadata.get("tle_line2")
if not tle_line1 or not tle_line2:
tle_line1, tle_line2 = build_tle_lines_from_elements(
norad_cat_id=norad_id,
epoch=metadata.get("epoch"),
inclination=metadata.get("inclination"),
raan=metadata.get("raan"),
eccentricity=metadata.get("eccentricity"),
arg_of_perigee=metadata.get("arg_of_perigee"),
mean_anomaly=metadata.get("mean_anomaly"),
mean_motion=metadata.get("mean_motion"),
)
features.append(
{
"type": "Feature",
@@ -174,6 +189,8 @@ def convert_satellite_to_geojson(records: List[CollectedData]) -> Dict[str, Any]
"mean_motion": metadata.get("mean_motion"),
"bstar": metadata.get("bstar"),
"classification_type": metadata.get("classification_type"),
"tle_line1": tle_line1,
"tle_line2": tle_line2,
"data_type": "satellite_tle",
},
}

View File

@@ -0,0 +1,116 @@
"""Helpers for building stable TLE lines from orbital elements."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
def compute_tle_checksum(line: str) -> str:
"""Compute the standard modulo-10 checksum for a TLE line."""
total = 0
for char in line[:68]:
if char.isdigit():
total += int(char)
elif char == "-":
total += 1
return str(total % 10)
def _parse_epoch(value: Any) -> Optional[datetime]:
if not value:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def build_tle_line1(norad_cat_id: Any, epoch: Any) -> Optional[str]:
"""Build a valid TLE line 1 from the NORAD id and epoch."""
epoch_date = _parse_epoch(epoch)
if not norad_cat_id or epoch_date is None:
return None
epoch_year = epoch_date.year % 100
start_of_year = datetime(epoch_date.year, 1, 1, tzinfo=epoch_date.tzinfo)
day_of_year = (epoch_date - start_of_year).days + 1
ms_of_day = (
epoch_date.hour * 3600000
+ epoch_date.minute * 60000
+ epoch_date.second * 1000
+ int(epoch_date.microsecond / 1000)
)
day_fraction = ms_of_day / 86400000
decimal_fraction = f"{day_fraction:.8f}"[1:]
epoch_str = f"{epoch_year:02d}{day_of_year:03d}{decimal_fraction}"
core = (
f"1 {int(norad_cat_id):05d}U 00001A {epoch_str}"
" .00000000 00000-0 00000-0 0 999"
)
return core + compute_tle_checksum(core)
def build_tle_line2(
norad_cat_id: Any,
inclination: Any,
raan: Any,
eccentricity: Any,
arg_of_perigee: Any,
mean_anomaly: Any,
mean_motion: Any,
) -> Optional[str]:
"""Build a valid TLE line 2 from the standard orbital elements."""
required = [
norad_cat_id,
inclination,
raan,
eccentricity,
arg_of_perigee,
mean_anomaly,
mean_motion,
]
if any(value is None for value in required):
return None
eccentricity_digits = str(round(float(eccentricity) * 10_000_000)).zfill(7)
core = (
f"2 {int(norad_cat_id):05d}"
f" {float(inclination):8.4f}"
f" {float(raan):8.4f}"
f" {eccentricity_digits}"
f" {float(arg_of_perigee):8.4f}"
f" {float(mean_anomaly):8.4f}"
f" {float(mean_motion):11.8f}"
"00000"
)
return core + compute_tle_checksum(core)
def build_tle_lines_from_elements(
*,
norad_cat_id: Any,
epoch: Any,
inclination: Any,
raan: Any,
eccentricity: Any,
arg_of_perigee: Any,
mean_anomaly: Any,
mean_motion: Any,
) -> tuple[Optional[str], Optional[str]]:
"""Build both TLE lines from a metadata payload."""
line1 = build_tle_line1(norad_cat_id, epoch)
line2 = build_tle_line2(
norad_cat_id,
inclination,
raan,
eccentricity,
arg_of_perigee,
mean_anomaly,
mean_motion,
)
return line1, line2

View File

@@ -8,6 +8,7 @@ import json
from typing import Dict, Any, List
import httpx
from app.core.satellite_tle import build_tle_lines_from_elements
from app.services.collectors.base import BaseCollector
@@ -61,6 +62,17 @@ class CelesTrakTLECollector(BaseCollector):
def transform(self, raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
transformed = []
for item in raw_data:
tle_line1, tle_line2 = build_tle_lines_from_elements(
norad_cat_id=item.get("NORAD_CAT_ID"),
epoch=item.get("EPOCH"),
inclination=item.get("INCLINATION"),
raan=item.get("RA_OF_ASC_NODE"),
eccentricity=item.get("ECCENTRICITY"),
arg_of_perigee=item.get("ARG_OF_PERICENTER"),
mean_anomaly=item.get("MEAN_ANOMALY"),
mean_motion=item.get("MEAN_MOTION"),
)
transformed.append(
{
"name": item.get("OBJECT_NAME", "Unknown"),
@@ -80,6 +92,10 @@ class CelesTrakTLECollector(BaseCollector):
"mean_motion_dot": item.get("MEAN_MOTION_DOT"),
"mean_motion_ddot": item.get("MEAN_MOTION_DDOT"),
"ephemeris_type": item.get("EPHEMERIS_TYPE"),
# Prefer the original TLE lines when the source provides them.
# If they are missing, store a normalized TLE pair built once on the backend.
"tle_line1": item.get("TLE_LINE1") or tle_line1,
"tle_line2": item.get("TLE_LINE2") or tle_line2,
},
}
)

View File

@@ -10,6 +10,7 @@ import httpx
from app.services.collectors.base import BaseCollector
from app.core.data_sources import get_data_sources_config
from app.core.satellite_tle import build_tle_lines_from_elements
class SpaceTrackTLECollector(BaseCollector):
@@ -169,25 +170,41 @@ class SpaceTrackTLECollector(BaseCollector):
"""Transform TLE data to internal format"""
transformed = []
for item in raw_data:
tle_line1, tle_line2 = build_tle_lines_from_elements(
norad_cat_id=item.get("NORAD_CAT_ID"),
epoch=item.get("EPOCH"),
inclination=item.get("INCLINATION"),
raan=item.get("RAAN"),
eccentricity=item.get("ECCENTRICITY"),
arg_of_perigee=item.get("ARG_OF_PERIGEE"),
mean_anomaly=item.get("MEAN_ANOMALY"),
mean_motion=item.get("MEAN_MOTION"),
)
transformed.append(
{
"name": item.get("OBJECT_NAME", "Unknown"),
"norad_cat_id": item.get("NORAD_CAT_ID"),
"international_designator": item.get("INTL_DESIGNATOR"),
"epoch": item.get("EPOCH"),
"mean_motion": item.get("MEAN_MOTION"),
"eccentricity": item.get("ECCENTRICITY"),
"inclination": item.get("INCLINATION"),
"raan": item.get("RAAN"),
"arg_of_perigee": item.get("ARG_OF_PERIGEE"),
"mean_anomaly": item.get("MEAN_ANOMALY"),
"ephemeris_type": item.get("EPHEMERIS_TYPE"),
"classification_type": item.get("CLASSIFICATION_TYPE"),
"element_set_no": item.get("ELEMENT_SET_NO"),
"rev_at_epoch": item.get("REV_AT_EPOCH"),
"bstar": item.get("BSTAR"),
"mean_motion_dot": item.get("MEAN_MOTION_DOT"),
"mean_motion_ddot": item.get("MEAN_MOTION_DDOT"),
"reference_date": item.get("EPOCH", ""),
"metadata": {
"norad_cat_id": item.get("NORAD_CAT_ID"),
"international_designator": item.get("INTL_DESIGNATOR"),
"epoch": item.get("EPOCH"),
"mean_motion": item.get("MEAN_MOTION"),
"eccentricity": item.get("ECCENTRICITY"),
"inclination": item.get("INCLINATION"),
"raan": item.get("RAAN"),
"arg_of_perigee": item.get("ARG_OF_PERIGEE"),
"mean_anomaly": item.get("MEAN_ANOMALY"),
"ephemeris_type": item.get("EPHEMERIS_TYPE"),
"classification_type": item.get("CLASSIFICATION_TYPE"),
"element_set_no": item.get("ELEMENT_SET_NO"),
"rev_at_epoch": item.get("REV_AT_EPOCH"),
"bstar": item.get("BSTAR"),
"mean_motion_dot": item.get("MEAN_MOTION_DOT"),
"mean_motion_ddot": item.get("MEAN_MOTION_DDOT"),
# Prefer original lines from the source, but keep a backend-built pair as a stable fallback.
"tle_line1": item.get("TLE_LINE1") or item.get("TLE1") or tle_line1,
"tle_line2": item.get("TLE_LINE2") or item.get("TLE2") or tle_line2,
},
}
)
return transformed