import os import yaml from functools import lru_cache from typing import Optional COLLECTOR_URL_KEYS = { "arcgis_cables": "arcgis.cable_url", "arcgis_landing_points": "arcgis.landing_point_url", "arcgis_cable_landing_relation": "arcgis.cable_landing_relation_url", "fao_landing_points": "fao.landing_point_url", "telegeography_cables": "telegeography.cable_url", "telegeography_landing": "telegeography.landing_point_url", "huggingface_models": "huggingface.models_url", "huggingface_datasets": "huggingface.datasets_url", "huggingface_spaces": "huggingface.spaces_url", "cloudflare_radar_device": "cloudflare.radar_device_url", "cloudflare_radar_traffic": "cloudflare.radar_traffic_url", "cloudflare_radar_top_locations": "cloudflare.radar_top_locations_url", "peeringdb_ixp": "peeringdb.ixp_url", "peeringdb_network": "peeringdb.network_url", "peeringdb_facility": "peeringdb.facility_url", "top500": "top500.url", "epoch_ai_gpu": "epoch_ai.gpu_clusters_url", "spacetrack_tle": "spacetrack.tle_query_url", } class DataSourcesConfig: def __init__(self, config_path: str = None): if config_path is None: config_path = os.path.join(os.path.dirname(__file__), "data_sources.yaml") self._yaml_config = {} if os.path.exists(config_path): with open(config_path, "r") as f: self._yaml_config = yaml.safe_load(f) or {} def get_yaml_url(self, collector_name: str) -> str: key = COLLECTOR_URL_KEYS.get(collector_name, "") if not key: return "" parts = key.split(".") value = self._yaml_config for part in parts: if isinstance(value, dict): value = value.get(part, "") else: return "" return value if isinstance(value, str) else "" async def get_url(self, collector_name: str, db) -> str: yaml_url = self.get_yaml_url(collector_name) if not db: return yaml_url try: from sqlalchemy import select from app.models.datasource_config import DataSourceConfig query = select(DataSourceConfig).where( DataSourceConfig.name == collector_name, DataSourceConfig.is_active == True ) result = await db.execute(query) db_config = result.scalar_one_or_none() if db_config and db_config.endpoint: return db_config.endpoint except Exception: pass return yaml_url @lru_cache() def get_data_sources_config() -> DataSourcesConfig: return DataSourcesConfig()