161 lines
5.4 KiB
Python
161 lines
5.4 KiB
Python
"""Task Scheduler for running collection jobs."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Any, Dict
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from sqlalchemy import select
|
|
|
|
from app.db.session import async_session_factory
|
|
from app.models.datasource import DataSource
|
|
from app.services.collectors.registry import collector_registry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
|
|
async def _update_next_run_at(datasource: DataSource, session) -> None:
|
|
job = scheduler.get_job(datasource.source)
|
|
datasource.next_run_at = job.next_run_time if job else None
|
|
await session.commit()
|
|
|
|
|
|
async def _apply_datasource_schedule(datasource: DataSource, session) -> None:
|
|
collector = collector_registry.get(datasource.source)
|
|
if not collector:
|
|
logger.warning("Collector not found for datasource %s", datasource.source)
|
|
return
|
|
|
|
collector_registry.set_active(datasource.source, datasource.is_active)
|
|
|
|
existing_job = scheduler.get_job(datasource.source)
|
|
if existing_job:
|
|
scheduler.remove_job(datasource.source)
|
|
|
|
if datasource.is_active:
|
|
scheduler.add_job(
|
|
run_collector_task,
|
|
trigger=IntervalTrigger(minutes=max(1, datasource.frequency_minutes)),
|
|
id=datasource.source,
|
|
name=datasource.name,
|
|
replace_existing=True,
|
|
kwargs={"collector_name": datasource.source},
|
|
)
|
|
logger.info(
|
|
"Scheduled collector: %s (every %sm)",
|
|
datasource.source,
|
|
datasource.frequency_minutes,
|
|
)
|
|
else:
|
|
logger.info("Collector disabled: %s", datasource.source)
|
|
|
|
await _update_next_run_at(datasource, session)
|
|
|
|
|
|
async def run_collector_task(collector_name: str):
|
|
"""Run a single collector task."""
|
|
collector = collector_registry.get(collector_name)
|
|
if not collector:
|
|
logger.error("Collector not found: %s", collector_name)
|
|
return
|
|
|
|
async with async_session_factory() as db:
|
|
result = await db.execute(select(DataSource).where(DataSource.source == collector_name))
|
|
datasource = result.scalar_one_or_none()
|
|
if not datasource:
|
|
logger.error("Datasource not found for collector: %s", collector_name)
|
|
return
|
|
|
|
if not datasource.is_active:
|
|
logger.info("Skipping disabled collector: %s", collector_name)
|
|
return
|
|
|
|
try:
|
|
collector._datasource_id = datasource.id
|
|
logger.info("Running collector: %s (datasource_id=%s)", collector_name, datasource.id)
|
|
task_result = await collector.run(db)
|
|
datasource.last_run_at = datetime.utcnow()
|
|
datasource.last_status = task_result.get("status")
|
|
await _update_next_run_at(datasource, db)
|
|
logger.info("Collector %s completed: %s", collector_name, task_result)
|
|
except Exception as exc:
|
|
datasource.last_run_at = datetime.utcnow()
|
|
datasource.last_status = "failed"
|
|
await db.commit()
|
|
logger.exception("Collector %s failed: %s", collector_name, exc)
|
|
|
|
|
|
def start_scheduler() -> None:
|
|
"""Start the scheduler."""
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
logger.info("Scheduler started")
|
|
|
|
|
|
def stop_scheduler() -> None:
|
|
"""Stop the scheduler."""
|
|
if scheduler.running:
|
|
scheduler.shutdown(wait=False)
|
|
logger.info("Scheduler stopped")
|
|
|
|
|
|
async def sync_scheduler_with_datasources() -> None:
|
|
"""Synchronize scheduler jobs with datasource table."""
|
|
async with async_session_factory() as db:
|
|
result = await db.execute(select(DataSource).order_by(DataSource.id))
|
|
datasources = result.scalars().all()
|
|
|
|
configured_sources = {datasource.source for datasource in datasources}
|
|
for job in list(scheduler.get_jobs()):
|
|
if job.id not in configured_sources:
|
|
scheduler.remove_job(job.id)
|
|
|
|
for datasource in datasources:
|
|
await _apply_datasource_schedule(datasource, db)
|
|
|
|
|
|
async def sync_datasource_job(datasource_id: int) -> bool:
|
|
"""Synchronize a single datasource job after settings changes."""
|
|
async with async_session_factory() as db:
|
|
datasource = await db.get(DataSource, datasource_id)
|
|
if not datasource:
|
|
return False
|
|
|
|
await _apply_datasource_schedule(datasource, db)
|
|
return True
|
|
|
|
|
|
def get_scheduler_jobs() -> list[Dict[str, Any]]:
|
|
"""Get all scheduled jobs."""
|
|
jobs = []
|
|
for job in scheduler.get_jobs():
|
|
jobs.append(
|
|
{
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
|
|
"trigger": str(job.trigger),
|
|
}
|
|
)
|
|
return jobs
|
|
|
|
|
|
def run_collector_now(collector_name: str) -> bool:
|
|
"""Run a collector immediately (not scheduled)."""
|
|
collector = collector_registry.get(collector_name)
|
|
if not collector:
|
|
logger.error("Collector not found: %s", collector_name)
|
|
return False
|
|
|
|
try:
|
|
asyncio.create_task(run_collector_task(collector_name))
|
|
logger.info("Triggered collector: %s", collector_name)
|
|
return True
|
|
except Exception as exc:
|
|
logger.error("Failed to trigger collector %s: %s", collector_name, exc)
|
|
return False
|