"""TeleGeography Submarine Cables Collector Collects data from TeleGeography submarine cable database. Uses Wayback Machine as backup data source since live data requires JavaScript rendering. """ import json import re from typing import Dict, Any, List from datetime import datetime from bs4 import BeautifulSoup import httpx from app.services.collectors.base import BaseCollector class TeleGeographyCableCollector(BaseCollector): name = "telegeography_cables" priority = "P1" module = "L2" frequency_hours = 168 # 7 days data_type = "submarine_cable" async def fetch(self) -> List[Dict[str, Any]]: """Fetch submarine cable data from Wayback Machine""" # Try multiple data sources sources = [ # Wayback Machine archive of TeleGeography "https://web.archive.org/web/2024/https://www.submarinecablemap.com/api/v3/cable", # Alternative: Try scraping the page "https://www.submarinecablemap.com", ] for url in sources: try: async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: response = await client.get(url) response.raise_for_status() # Check if response is JSON content_type = response.headers.get("content-type", "") if "application/json" in content_type or url.endswith(".json"): return self.parse_response(response.json()) else: # It's HTML, try to scrape data = self.scrape_cables_from_html(response.text) if data: return data except Exception: continue # Fallback to sample data return self._get_sample_data() def scrape_cables_from_html(self, html: str) -> List[Dict[str, Any]]: """Try to extract cable data from HTML page""" data = [] soup = BeautifulSoup(html, "html.parser") # Look for embedded JSON data in scripts scripts = soup.find_all("script") for script in scripts: text = script.string or "" if "cable" in text.lower() and ("{" in text or "[" in text): # Try to find JSON data match = re.search(r"\[.+\]", text, re.DOTALL) if match: try: potential_data = json.loads(match.group()) if isinstance(potential_data, list): return potential_data except: pass return data def parse_response(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Parse submarine cable data""" result = [] if not isinstance(data, list): data = [data] for item in data: try: entry = { "source_id": f"telegeo_cable_{item.get('id', item.get('cable_id', ''))}", "name": item.get("name", item.get("cable_name", "Unknown")), "country": "", "city": "", "latitude": "", "longitude": "", "value": str(item.get("length", item.get("length_km", 0))), "unit": "km", "metadata": { "owner": item.get("owner"), "operator": item.get("operator"), "length_km": item.get("length", item.get("length_km")), "rfs": item.get("rfs"), "status": item.get("status", "active"), "cable_type": item.get("type", "fiber optic"), "capacity_tbps": item.get("capacity"), "url": item.get("url"), }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): continue if not result: result = self._get_sample_data() return result def _get_sample_data(self) -> List[Dict[str, Any]]: """Return sample submarine cable data""" return [ { "source_id": "telegeo_sample_1", "name": "2Africa", "country": "", "city": "", "latitude": "", "longitude": "", "value": "45000", "unit": "km", "metadata": { "note": "Sample data - TeleGeography requires browser/scraper for live data", "owner": "Meta, Orange, Vodafone, etc.", "status": "active", }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), }, { "source_id": "telegeo_sample_2", "name": "Asia Connect Cable 1", "country": "", "city": "", "latitude": "", "longitude": "", "value": "12000", "unit": "km", "metadata": { "note": "Sample data", "owner": "Alibaba, NEC", "status": "planned", }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), }, ] class TeleGeographyLandingPointCollector(BaseCollector): name = "telegeography_landing" priority = "P2" module = "L2" frequency_hours = 168 data_type = "landing_point" async def fetch(self) -> List[Dict[str, Any]]: """Fetch landing point data from GitHub mirror""" url = "https://raw.githubusercontent.com/lintaojlu/submarine_cable_information/main/landing_point.json" async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(url) response.raise_for_status() return self.parse_response(response.json()) def parse_response(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Parse landing point data""" result = [] for item in data: try: entry = { "source_id": f"telegeo_lp_{item.get('id', '')}", "name": item.get("name", "Unknown"), "country": item.get("country", "Unknown"), "city": item.get("city", item.get("name", "")), "latitude": str(item.get("latitude", "")), "longitude": str(item.get("longitude", "")), "value": "", "unit": "", "metadata": { "cable_count": len(item.get("cables", [])), "url": item.get("url"), }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): continue if not result: result = self._get_sample_data() return result def _get_sample_data(self) -> List[Dict[str, Any]]: """Return sample landing point data""" return [ { "source_id": "telegeo_lp_sample_1", "name": "Sample Landing Point", "country": "United States", "city": "Los Angeles, CA", "latitude": "34.0522", "longitude": "-118.2437", "value": "", "unit": "", "metadata": {"note": "Sample data"}, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), }, ] class TeleGeographyCableSystemCollector(BaseCollector): name = "telegeography_systems" priority = "P2" module = "L2" frequency_hours = 168 data_type = "cable_system" async def fetch(self) -> List[Dict[str, Any]]: """Fetch cable system data""" url = "https://raw.githubusercontent.com/lintaojlu/submarine_cable_information/main/cable.json" async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(url) response.raise_for_status() return self.parse_response(response.json()) def parse_response(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Parse cable system data""" result = [] for item in data: try: entry = { "source_id": f"telegeo_sys_{item.get('id', item.get('cable_id', ''))}", "name": item.get("name", item.get("cable_name", "Unknown")), "country": "", "city": "", "latitude": "", "longitude": "", "value": str(item.get("length", 0)), "unit": "km", "metadata": { "owner": item.get("owner"), "operator": item.get("operator"), "route": item.get("route"), "countries": item.get("countries", []), "length_km": item.get("length"), "rfs": item.get("rfs"), "status": item.get("status", "active"), "investment": item.get("investment"), "url": item.get("url"), }, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), } result.append(entry) except (ValueError, TypeError, KeyError): continue if not result: result = self._get_sample_data() return result def _get_sample_data(self) -> List[Dict[str, Any]]: """Return sample cable system data""" return [ { "source_id": "telegeo_sys_sample_1", "name": "Sample Cable System", "country": "", "city": "", "latitude": "", "longitude": "", "value": "5000", "unit": "km", "metadata": {"note": "Sample data"}, "reference_date": datetime.utcnow().strftime("%Y-%m-%d"), }, ]