"""Base collector class for all data sources""" from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional from datetime import datetime import httpx from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings class BaseCollector(ABC): """Abstract base class for data collectors""" name: str = "base_collector" priority: str = "P1" module: str = "L1" frequency_hours: int = 4 data_type: str = "generic" def __init__(self): self._current_task = None self._db_session = None self._datasource_id = 1 self._resolved_url: Optional[str] = None async def resolve_url(self, db: AsyncSession) -> None: from app.core.data_sources import get_data_sources_config config = get_data_sources_config() self._resolved_url = await config.get_url(self.name, db) def update_progress(self, records_processed: int): """Update task progress - call this during data processing""" if self._current_task and self._db_session and self._current_task.total_records > 0: self._current_task.records_processed = records_processed self._current_task.progress = ( records_processed / self._current_task.total_records ) * 100 @abstractmethod async def fetch(self) -> List[Dict[str, Any]]: """Fetch raw data from source""" pass def transform(self, raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Transform raw data to internal format (default: pass through)""" return raw_data async def run(self, db: AsyncSession) -> Dict[str, Any]: """Full pipeline: fetch -> transform -> save""" from app.services.collectors.registry import collector_registry from app.models.task import CollectionTask from app.models.collected_data import CollectedData start_time = datetime.utcnow() datasource_id = getattr(self, "_datasource_id", 1) if not collector_registry.is_active(self.name): return {"status": "skipped", "reason": "Collector is disabled"} task = CollectionTask( datasource_id=datasource_id, status="running", started_at=start_time, ) db.add(task) await db.commit() task_id = task.id self._current_task = task self._db_session = db await self.resolve_url(db) try: raw_data = await self.fetch() task.total_records = len(raw_data) await db.commit() data = self.transform(raw_data) records_count = await self._save_data(db, data) task.status = "success" task.records_processed = records_count task.progress = 100.0 task.completed_at = datetime.utcnow() await db.commit() return { "status": "success", "task_id": task_id, "records_processed": records_count, "execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(), } except Exception as e: task.status = "failed" task.error_message = str(e) task.completed_at = datetime.utcnow() await db.commit() return { "status": "failed", "task_id": task_id, "error": str(e), "execution_time_seconds": (datetime.utcnow() - start_time).total_seconds(), } async def _save_data(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int: """Save transformed data to database""" from app.models.collected_data import CollectedData if not data: return 0 collected_at = datetime.utcnow() records_added = 0 for i, item in enumerate(data): print( f"DEBUG: Saving item {i}: name={item.get('name')}, metadata={item.get('metadata', 'NOT FOUND')}" ) record = CollectedData( source=self.name, source_id=item.get("source_id") or item.get("id"), data_type=self.data_type, name=item.get("name"), title=item.get("title"), description=item.get("description"), country=item.get("country"), city=item.get("city"), latitude=str(item.get("latitude", "")) if item.get("latitude") is not None else None, longitude=str(item.get("longitude", "")) if item.get("longitude") is not None else None, value=item.get("value"), unit=item.get("unit"), extra_data=item.get("metadata", {}), collected_at=collected_at, reference_date=datetime.fromisoformat( item.get("reference_date").replace("Z", "+00:00") ) if item.get("reference_date") else None, is_valid=1, ) db.add(record) records_added += 1 if i % 100 == 0: self.update_progress(i + 1) await db.commit() await db.commit() self.update_progress(len(data)) return records_added async def save(self, db: AsyncSession, data: List[Dict[str, Any]]) -> int: """Save data to database (legacy method, use _save_data instead)""" return await self._save_data(db, data) class HTTPCollector(BaseCollector): """Base class for HTTP API collectors""" base_url: str = "" headers: Dict[str, str] = {} async def fetch(self) -> List[Dict[str, Any]]: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(self.base_url, headers=self.headers) response.raise_for_status() return self.parse_response(response.json()) @abstractmethod def parse_response(self, response: Dict[str, Any]) -> List[Dict[str, Any]]: pass class IntervalCollector(BaseCollector): """Base class for collectors that run on intervals""" async def run(self, db: AsyncSession) -> Dict[str, Any]: return await super().run(db) async def log_task( db: AsyncSession, datasource_id: int, status: str, records_processed: int = 0, error_message: Optional[str] = None, ): """Log collection task to database""" from app.models.task import CollectionTask task = CollectionTask( datasource_id=datasource_id, status=status, records_processed=records_processed, error_message=error_message, started_at=datetime.utcnow(), completed_at=datetime.utcnow(), ) db.add(task) await db.commit()