Files
2026-03-05 11:46:58 +08:00

44 lines
1.2 KiB
Python

"""Collector registry for managing all data collectors"""
from typing import Dict, Optional
from app.services.collectors.base import BaseCollector
class CollectorRegistry:
"""Registry for all data collectors"""
_collectors: Dict[str, BaseCollector] = {}
_active_collectors: set = set()
@classmethod
def register(cls, collector: BaseCollector):
"""Register a collector"""
cls._collectors[collector.name] = collector
cls._active_collectors.add(collector.name)
@classmethod
def get(cls, name: str) -> Optional[BaseCollector]:
"""Get a collector by name"""
return cls._collectors.get(name)
@classmethod
def all(cls) -> Dict[str, BaseCollector]:
"""Get all collectors"""
return cls._collectors.copy()
@classmethod
def is_active(cls, name: str) -> bool:
"""Check if a collector is active"""
return name in cls._active_collectors
@classmethod
def set_active(cls, name: str, active: bool = True):
"""Set collector active status"""
if active:
cls._active_collectors.add(name)
else:
cls._active_collectors.discard(name)
collector_registry = CollectorRegistry()