"""Task Scheduler for running collection jobs""" import asyncio import logging from datetime import datetime from typing import Dict, Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy.ext.asyncio import AsyncSession from app.db.session import async_session_factory from app.services.collectors.registry import collector_registry logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler() COLLECTOR_TO_ID = { "top500": 1, "epoch_ai_gpu": 2, "huggingface_models": 3, "huggingface_datasets": 4, "huggingface_spaces": 5, "peeringdb_ixp": 6, "peeringdb_network": 7, "peeringdb_facility": 8, "telegeography_cables": 9, "telegeography_landing": 10, "telegeography_systems": 11, } async def run_collector_task(collector_name: str): """Run a single collector task""" collector = collector_registry.get(collector_name) if not collector: logger.error(f"Collector not found: {collector_name}") return # Get the correct datasource_id datasource_id = COLLECTOR_TO_ID.get(collector_name, 1) async with async_session_factory() as db: try: # Set the datasource_id on the collector instance collector._datasource_id = datasource_id logger.info(f"Running collector: {collector_name} (datasource_id={datasource_id})") result = await collector.run(db) logger.info(f"Collector {collector_name} completed: {result}") except Exception as e: logger.error(f"Collector {collector_name} failed: {e}") def start_scheduler(): """Start the scheduler with all registered collectors""" collectors = collector_registry.all() for name, collector in collectors.items(): if collector_registry.is_active(name): scheduler.add_job( run_collector_task, trigger=IntervalTrigger(hours=collector.frequency_hours), id=name, name=name, replace_existing=True, kwargs={"collector_name": name}, ) logger.info(f"Scheduled collector: {name} (every {collector.frequency_hours}h)") scheduler.start() logger.info("Scheduler started") def stop_scheduler(): """Stop the scheduler""" scheduler.shutdown() logger.info("Scheduler stopped") def get_scheduler_jobs() -> list[Dict[str, Any]]: """Get all scheduled jobs""" jobs = [] for job in scheduler.get_jobs(): jobs.append( { "id": job.id, "name": job.name, "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, "trigger": str(job.trigger), } ) return jobs def add_job(collector_name: str, hours: int = 4): """Add a new scheduled job""" collector = collector_registry.get(collector_name) if not collector: raise ValueError(f"Collector not found: {collector_name}") scheduler.add_job( run_collector_task, trigger=IntervalTrigger(hours=hours), id=collector_name, name=collector_name, replace_existing=True, kwargs={"collector_name": collector_name}, ) logger.info(f"Added scheduled job: {collector_name} (every {hours}h)") def remove_job(collector_name: str): """Remove a scheduled job""" scheduler.remove_job(collector_name) logger.info(f"Removed scheduled job: {collector_name}") def pause_job(collector_name: str): """Pause a scheduled job""" scheduler.pause_job(collector_name) logger.info(f"Paused job: {collector_name}") def resume_job(collector_name: str): """Resume a scheduled job""" scheduler.resume_job(collector_name) logger.info(f"Resumed job: {collector_name}") def run_collector_now(collector_name: str) -> bool: """Run a collector immediately (not scheduled)""" collector = collector_registry.get(collector_name) if not collector: logger.error(f"Collector not found: {collector_name}") return False try: asyncio.create_task(run_collector_task(collector_name)) logger.info(f"Triggered collector: {collector_name}") return True except Exception as e: logger.error(f"Failed to trigger collector {collector_name}: {e}") return False