Files
rayd1o c82e1d5a04 fix: 修复3D地球坐标映射多个严重bug
## Bug修复详情

### 1. 致命错误:球面距离计算 (calculateDistance)
- 问题:使用勾股定理计算经纬度距离,在球体表面完全错误
- 修复:改用Haversine公式计算球面大圆距离
- 影响:赤道1度=111km,极地1度=19km,原计算误差巨大

### 2. 经度范围规范化 (vector3ToLatLon)
- 问题:Math.atan2返回[-180°,180°],转换后可能超出标准范围
- 修复:添加while循环规范化到[-180, 180]区间
- 影响:避免本初子午线附近返回360°的异常值

### 3. 屏幕坐标转换支持非全屏 (screenToEarthCoords)
- 问题:假设Canvas永远全屏,非全屏时点击偏移严重
- 修复:新增domElement参数,使用getBoundingClientRect()计算相对坐标
- 影响:嵌入式3D地球组件也能精准拾取

### 4. 地球旋转时经纬度映射错误
- 问题:Raycaster返回世界坐标,未考虑地球自转
- 修复:使用earth.worldToLocal()转换到本地坐标空间
- 影响:地球旋转时经纬度显示正确跟随

## 新增功能

- CelesTrak卫星数据采集器
- Space-Track卫星数据采集器
- 卫星可视化模块(500颗,实时SGP4轨道计算)
- 海底光缆悬停显示info-card
- 统一info-card组件
- 工具栏按钮(Stellarium风格)
- 缩放控制(百分比显示)
- Docker volume映射(代码热更新)

## 文件变更

- utils.js: 坐标转换核心逻辑修复
- satellites.js: 新增卫星可视化
- cables.js: 悬停交互支持
- main.js: 悬停/锁定逻辑
- controls.js: 工具栏UI
- info-card.js: 统一卡片组件
- docker-compose.yml: volume映射
- restart.sh: 简化重启脚本
2026-03-17 04:10:24 +08:00

211 lines
6.8 KiB
Python

"""Base collector class for all data sources"""
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from datetime import datetime
import httpx
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
class BaseCollector(ABC):
"""Abstract base class for data collectors"""
name: str = "base_collector"
priority: str = "P1"
module: str = "L1"
frequency_hours: int = 4
data_type: str = "generic"
def __init__(self):
self._current_task = None
self._db_session = None
self._datasource_id = 1
self._resolved_url: Optional[str] = None
async def resolve_url(self, db: AsyncSession) -> None:
from app.core.data_sources import get_data_sources_config
config = get_data_sources_config()
self._resolved_url = await config.get_url(self.name, db)
def update_progress(self, records_processed: int):
"""Update task progress - call this during data processing"""
if self._current_task and self._db_session and self._current_task.total_records > 0:
self._current_task.records_processed = records_processed
self._current_task.progress = (
records_processed / self._current_task.total_records
) * 100
@abstractmethod
async def fetch(self) -> List[Dict[str, Any]]:
"""Fetch raw data from source"""
pass
def transform(self, raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Transform raw data to internal format (default: pass through)"""
return raw_data
async def run(self, db: AsyncSession) -> Dict[str, Any]:
"""Full pipeline: fetch -> transform -> save"""
from app.services.collectors.registry import collector_registry
from app.models.task import CollectionTask
from app.models.collected_data import CollectedData
start_time = datetime.utcnow()
datasource_id = getattr(self, "_datasource_id", 1)
if not collector_registry.is_active(self.name):
return {"status": "skipped", "reason": "Collector is disabled"}
task = CollectionTask(
datasource_id=datasource_id,
status="running",
started_at=start_time,
)
db.add(task)
await db.commit()
task_id = task.id
self._current_task = task
self._db_session = db
await self.resolve_url(db)
try:
raw_data = await self.fetch()
task.total_records = len(raw_data)
await db.commit()
data = self.transform(raw_data)
records_count = await self._save_data(db, data)
task.status = "success"
task.records_processed = records_count
task.progress = 100.0
task.completed_at = datetime.utcnow()
await db.commit()
return {
"status": "success",
"task_id": task_id,
"records_processed": records_count,
"execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(),
}
except Exception as e:
task.status = "failed"
task.error_message = str(e)
task.completed_at = datetime.utcnow()
await db.commit()
return {
"status": "failed",
"task_id": task_id,
"error": str(e),
"execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(),
}
async def _save_data(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int:
"""Save transformed data to database"""
from app.models.collected_data import CollectedData
if not data:
return 0
collected_at = datetime.utcnow()
records_added = 0
for i, item in enumerate(data):
print(
f"DEBUG: Saving item {i}: name={item.get('name')}, metadata={item.get('metadata', 'NOT FOUND')}"
)
record = CollectedData(
source=self.name,
source_id=item.get("source_id") or item.get("id"),
data_type=self.data_type,
name=item.get("name"),
title=item.get("title"),
description=item.get("description"),
country=item.get("country"),
city=item.get("city"),
latitude=str(item.get("latitude", ""))
if item.get("latitude") is not None
else None,
longitude=str(item.get("longitude", ""))
if item.get("longitude") is not None
else None,
value=item.get("value"),
unit=item.get("unit"),
extra_data=item.get("metadata", {}),
collected_at=collected_at,
reference_date=datetime.fromisoformat(
item.get("reference_date").replace("Z", "+00:00")
)
if item.get("reference_date")
else None,
is_valid=1,
)
db.add(record)
records_added += 1
if i % 100 == 0:
self.update_progress(i + 1)
await db.commit()
await db.commit()
self.update_progress(len(data))
return records_added
async def save(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int:
"""Save data to database (legacy method, use _save_data instead)"""
return await self._save_data(db, data)
class HTTPCollector(BaseCollector):
"""Base class for HTTP API collectors"""
base_url: str = ""
headers: Dict[str, str] = {}
async def fetch(self) -> List[Dict[str, Any]]:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(self.base_url, headers=self.headers)
response.raise_for_status()
return self.parse_response(response.json())
@abstractmethod
def parse_response(self, response: Dict[str, Any]) -> List[Dict[str, Any]]:
pass
class IntervalCollector(BaseCollector):
"""Base class for collectors that run on intervals"""
async def run(self, db: AsyncSession) -> Dict[str, Any]:
return await super().run(db)
async def log_task(
db: AsyncSession,
datasource_id: int,
status: str,
records_processed: int = 0,
error_message: Optional[str] = None,
):
"""Log collection task to database"""
from app.models.task import CollectionTask
task = CollectionTask(
datasource_id=datasource_id,
status=status,
records_processed=records_processed,
error_message=error_message,
started_at=datetime.utcnow(),
completed_at=datetime.utcnow(),
)
db.add(task)
await db.commit()