Files
planet/backend/app/services/scheduler.py
rayd1o aaae6a53c3 feat(backend): Add cable graph service and data collectors
## Changelog

### New Features

#### Cable Graph Service
- Add cable_graph.py for finding shortest path between landing points
- Implement haversine distance calculation for great circle distances
- Support for dateline crossing (longitude normalization)
- NetworkX-based graph for optimal path finding

#### Data Collectors
- Add ArcGISCableCollector for fetching submarine cable data from ArcGIS GeoJSON API
- Add FAOLandingPointCollector for fetching landing point data from FAO CSV API

### Backend Changes

#### API Updates
- auth.py: Update authentication logic
- datasources.py: Add datasource endpoints and management
- visualization.py: Add visualization API endpoints
- config.py: Update configuration settings
- security.py: Improve security settings

#### Models & Schemas
- task.py: Update task model with new fields
- token.py: Update token schema

#### Services
- collectors/base.py: Improve base collector with better error handling
- collectors/__init__.py: Register new collectors
- scheduler.py: Update scheduler logic
- tasks/scheduler.py: Add task scheduling

### Frontend Changes
- AppLayout.tsx: Improve layout component
- index.css: Add global styles
- DataSources.tsx: Enhance data sources management page
- vite.config.ts: Add Vite configuration for earth module
2026-03-11 16:38:49 +08:00

149 lines
4.4 KiB
Python

"""Task Scheduler for running collection jobs"""
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_factory
from app.services.collectors.registry import collector_registry
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
COLLECTOR_TO_ID = {
"top500": 1,
"epoch_ai_gpu": 2,
"huggingface_models": 3,
"huggingface_datasets": 4,
"huggingface_spaces": 5,
"peeringdb_ixp": 6,
"peeringdb_network": 7,
"peeringdb_facility": 8,
"telegeography_cables": 9,
"telegeography_landing": 10,
"telegeography_systems": 11,
"arcgis_cables": 15,
"fao_landing_points": 16,
}
async def run_collector_task(collector_name: str):
"""Run a single collector task"""
collector = collector_registry.get(collector_name)
if not collector:
logger.error(f"Collector not found: {collector_name}")
return
# Get the correct datasource_id
datasource_id = COLLECTOR_TO_ID.get(collector_name, 1)
async with async_session_factory() as db:
try:
# Set the datasource_id on the collector instance
collector._datasource_id = datasource_id
logger.info(f"Running collector: {collector_name} (datasource_id={datasource_id})")
result = await collector.run(db)
logger.info(f"Collector {collector_name} completed: {result}")
except Exception as e:
logger.error(f"Collector {collector_name} failed: {e}")
def start_scheduler():
"""Start the scheduler with all registered collectors"""
collectors = collector_registry.all()
for name, collector in collectors.items():
if collector_registry.is_active(name):
scheduler.add_job(
run_collector_task,
trigger=IntervalTrigger(hours=collector.frequency_hours),
id=name,
name=name,
replace_existing=True,
kwargs={"collector_name": name},
)
logger.info(f"Scheduled collector: {name} (every {collector.frequency_hours}h)")
scheduler.start()
logger.info("Scheduler started")
def stop_scheduler():
"""Stop the scheduler"""
scheduler.shutdown()
logger.info("Scheduler stopped")
def get_scheduler_jobs() -> list[Dict[str, Any]]:
"""Get all scheduled jobs"""
jobs = []
for job in scheduler.get_jobs():
jobs.append(
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger),
}
)
return jobs
def add_job(collector_name: str, hours: int = 4):
"""Add a new scheduled job"""
collector = collector_registry.get(collector_name)
if not collector:
raise ValueError(f"Collector not found: {collector_name}")
scheduler.add_job(
run_collector_task,
trigger=IntervalTrigger(hours=hours),
id=collector_name,
name=collector_name,
replace_existing=True,
kwargs={"collector_name": collector_name},
)
logger.info(f"Added scheduled job: {collector_name} (every {hours}h)")
def remove_job(collector_name: str):
"""Remove a scheduled job"""
scheduler.remove_job(collector_name)
logger.info(f"Removed scheduled job: {collector_name}")
def pause_job(collector_name: str):
"""Pause a scheduled job"""
scheduler.pause_job(collector_name)
logger.info(f"Paused job: {collector_name}")
def resume_job(collector_name: str):
"""Resume a scheduled job"""
scheduler.resume_job(collector_name)
logger.info(f"Resumed job: {collector_name}")
def run_collector_now(collector_name: str) -> bool:
"""Run a collector immediately (not scheduled)"""
collector = collector_registry.get(collector_name)
if not collector:
logger.error(f"Collector not found: {collector_name}")
return False
try:
asyncio.create_task(run_collector_task(collector_name))
logger.info(f"Triggered collector: {collector_name}")
return True
except Exception as e:
logger.error(f"Failed to trigger collector {collector_name}: {e}")
return False