"""Cable graph service for finding shortest path between landing points""" import math from typing import List, Dict, Any, Optional, Tuple import networkx as nx def normalize_longitude(lon: float) -> float: """Normalize longitude to -180 to 180 range""" while lon > 180: lon -= 360 while lon < -180: lon += 360 return lon def haversine_distance(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> float: """Calculate great circle distance between two points in km, handling dateline crossing""" lon1, lat1 = normalize_longitude(coord1[0]), coord1[1] lon2, lat2 = normalize_longitude(coord2[0]), coord2[1] R = 6371 lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = ( math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2 ) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c class CableGraph: def __init__(self, cables: List[Dict], landing_points: List[Dict]): self.graph = nx.Graph() self.landing_points = {lp["id"]: lp for lp in landing_points} self.point_coords = {lp["id"]: (lp["lon"], lp["lat"]) for lp in landing_points} self._build_graph(cables) def _build_graph(self, cables: List[Dict]): """Build graph from cables and landing points""" for cable in cables: coords = cable.get("coordinates", []) if len(coords) < 2: continue # Find nearest landing points for start and end (search more points) start_point = self._find_nearest_landing_point_multi(coords[:3]) # First 3 points end_point = self._find_nearest_landing_point_multi(coords[-3:]) # Last 3 points if start_point and end_point and start_point != end_point: # Calculate distance via cable route distance = self._calculate_cable_distance(coords) # Add edge with cable info edge_data = { "distance": distance, "cable_name": cable.get("name", "Unknown"), "cable_id": cable.get("id"), "coordinates": coords, } # If edge exists, keep the shorter one if self.graph.has_edge(start_point, end_point): existing_dist = self.graph[start_point][end_point]["distance"] if distance < existing_dist: self.graph[start_point][end_point].update(edge_data) else: self.graph.add_edge(start_point, end_point, **edge_data) def _find_nearest_landing_point_multi(self, coords_subset: List[List[float]]) -> Optional[int]: """Find nearest landing point from multiple coordinates (e.g., first/last N points)""" best_point = None best_dist = float("inf") for coord in coords_subset: point = self._find_nearest_landing_point(coord) if point: dist = haversine_distance( (normalize_longitude(coord[0]), coord[1]), self.point_coords[point] ) if dist < best_dist: best_dist = dist best_point = point return best_point def _find_nearest_landing_point(self, coord: List[float]) -> Optional[int]: """Find nearest landing point to given coordinate""" if not self.point_coords: return None min_dist = float("inf") nearest_id = None target_lon = normalize_longitude(coord[0]) target_lat = coord[1] for lp_id, (lon, lat) in self.point_coords.items(): dist = haversine_distance((target_lon, target_lat), (lon, lat)) if dist < min_dist: min_dist = dist nearest_id = lp_id return nearest_id if min_dist < 500 else None def _find_nearest_connected_landing_point(self, coord: List[float]) -> Optional[int]: """Find nearest landing point that's connected to the graph, handling dateline""" if not self.point_coords or not self.graph.nodes(): return None connected_nodes = set(self.graph.nodes()) min_dist = float("inf") nearest_id = None target_lon, target_lat = normalize_longitude(coord[0]), coord[1] for lp_id in connected_nodes: lp_lon, lp_lat = self.point_coords[lp_id] # Try both normalized versions (for points near dateline) dist = haversine_distance((target_lon, target_lat), (lp_lon, lp_lat)) if dist < min_dist: min_dist = dist nearest_id = lp_id return nearest_id if min_dist < 500 else None def _calculate_cable_distance(self, coordinates: List[List[float]]) -> float: """Calculate total distance along cable route""" total = 0 for i in range(len(coordinates) - 1): total += haversine_distance( (coordinates[i][0], coordinates[i][1]), (coordinates[i + 1][0], coordinates[i + 1][1]), ) return total def find_shortest_path( self, start_coords: List[float], end_coords: List[float] ) -> Optional[Dict[str, Any]]: """Find shortest path between two coordinates""" start_point = self._find_nearest_connected_landing_point(start_coords) end_point = self._find_nearest_connected_landing_point(end_coords) if not start_point or not end_point: return None if not nx.has_path(self.graph, start_point, end_point): return None try: path = nx.shortest_path(self.graph, start_point, end_point, weight="distance") except nx.NetworkXNoPath: return None if not nx.has_path(self.graph, start_point, end_point): return None try: path = nx.shortest_path(self.graph, start_point, end_point, weight="distance") except nx.NetworkXNoPath: return None # Build result total_distance = 0 path_segments = [] for i in range(len(path) - 1): u, v = path[i], path[i + 1] edge_data = self.graph[u][v] total_distance += edge_data["distance"] path_segments.append( { "from": self.landing_points[u], "to": self.landing_points[v], "cable_name": edge_data["cable_name"], "cable_id": edge_data["cable_id"], "distance_km": round(edge_data["distance"], 2), "coordinates": edge_data["coordinates"], } ) return { "start": { "id": start_point, "name": self.landing_points[start_point].get("name", "Unknown"), "coords": list(self.point_coords[start_point]), }, "end": { "id": end_point, "name": self.landing_points[end_point].get("name", "Unknown"), "coords": list(self.point_coords[end_point]), }, "total_distance_km": round(total_distance, 2), "segments": path_segments, "segment_count": len(path_segments), } def build_graph_from_data(cables_data: Dict, points_data: Dict) -> CableGraph: """Build cable graph from GeoJSON data""" cables = [] for feature in cables_data.get("features", []): props = feature.get("properties", {}) coords = feature.get("geometry", {}).get("coordinates", []) if coords and isinstance(coords[0], list): coords = coords[0] # MultiLineString - take first line cables.append( { "id": props.get("id"), "name": props.get("name", props.get("Name", "Unknown")), "coordinates": coords, } ) points = [] for feature in points_data.get("features", []): geom = feature.get("geometry", {}) props = feature.get("properties", {}) coords = geom.get("coordinates", []) if coords and len(coords) >= 2: points.append( { "id": props.get("id"), "name": props.get("name", "Unknown"), "lon": coords[0], "lat": coords[1], } ) return CableGraph(cables, points)