Code source de src.smart_watch.processing.setup_processor

from src.smart_watch.core.ErrorHandler import (
    ErrorCategory,
    ErrorSeverity,
    handle_errors,
)
from src.smart_watch.utils.CSVToPolars import CSVToPolars


[docs] class SetupProcessor: """Gestionnaire de l'initialisation (chargement CSV et DB)."""
[docs] def __init__(self, config, logger): """ Initialise le processeur avec la configuration et le logger spécifiés. Args: config (Config): Objet de configuration contenant les paramètres nécessaires, y compris la base de données. logger (Logger): Instance du logger pour la journalisation des événements. """ self.config = config self.logger = logger
[docs] @handle_errors( category=ErrorCategory.CONFIGURATION, severity=ErrorSeverity.HIGH, user_message="Erreur lors de la configuration du pipeline", reraise=True, ) def setup_execution(self, db_manager): """ Initialise la pipeline de configuration en chargeant le CSV depuis la configuration (variable CSV_URL_HORAIRES) et en initialisant la base de données avec les données chargées. Étapes : - Charge le fichier CSV : CSV_URL_HORAIRES. - Vérifie le succès du chargement du CSV. - Initialise la base de données avec le DataFrame obtenu. Raises: ValueError: Si le chargement du CSV échoue. """ self.logger.section("CONFIGURATION PIPELINE") csv_loader = CSVToPolars( source=self.config.database.csv_url, separator="auto", has_header=True, ) df_csv = csv_loader.load_csv() if isinstance(df_csv, str): raise ValueError(f"Erreur chargement CSV: {df_csv}") # Initialise la base avec DatabaseManager et le DataFrame chargé return db_manager.setup_execution(df_csv)