Database Processor
Le DatabaseProcessor
est un module de service central qui encapsule toutes les interactions avec la base de données pour le pipeline de traitement. Il ne constitue pas une étape du pipeline lui-même, mais fournit une API de haut niveau aux autres processeurs pour lire et écrire des données de manière structurée.
Fonctionnalités
Gestion de la Base de Données : Utilise SQLAlchemy pour gérer la connexion, les sessions et le schéma de la base de données (création des tables lieux, executions, resultats_extraction).
Initialisation d’Exécution : La méthode
setup_execution
est une fonction clé qui prépare une nouvelle exécution en mettant à jour la liste des lieux, en créant un enregistrement d’exécution, et en identifiant les tâches incomplètes des exécutions précédentes à reprendre.Fournisseur de Données : Offre des méthodes spécifiques comme
get_pending_urls
etget_pending_llm
que les processeurs du pipeline utilisent pour récupérer leur file de travail.Persistance des Résultats : Propose des méthodes dédiées (ex:
update_url_result
,update_llm_result
) pour que chaque processeur puisse sauvegarder les résultats de son traitement à l’étape correspondante.Gestion des Erreurs : Centralise l’enregistrement des erreurs du pipeline dans la base de données, en les ajoutant à une chaîne d’erreurs traçable pour chaque lieu.
Usage
Une instance de DatabaseProcessor
est créée au début du pipeline et est ensuite passée en argument à chaque processeur qui a besoin d’interagir avec la base de données.
Modules
- class src.smart_watch.processing.database_processor.DatabaseProcessor(config: ConfigManager, logger: SmartWatchLogger)[source]
Bases :
object
Crée la base de données et les tables nécessaires.
- __init__(config: ConfigManager, logger: SmartWatchLogger)[source]
Initialise le processeur de base de données.
- create_database() DatabaseManager [source]
Crée les tables de la base de données si elles n’existent pas et retourne le manager.
- execute_query(query: str, params: dict | None = None) Sequence[Row[Any]] [source]
Exécute une requête SQL sur la base de données et retourne les résultats.
- setup_execution(df_csv) int [source]
Configure et initialise une nouvelle exécution à partir d’un DataFrame issu du fichier CSV pris sur l’URL indiqué dans CSV_URL_HORAIRES du fichier .env.
Etapes : 1. Met à jour les lieux dans la base de données à partir des données du DataFrame CSV. 2. Met à jour les horaires des lieux en se basant sur les références GL. 3. Crée une nouvelle entrée d’exécution dans la base de données. 4. Configure les résultats d’extraction associés à cette exécution en utilisant les données du DataFrame CSV.
- Paramètres:
df_csv (pd.DataFrame) – DataFrame contenant les données extraites du CSV.
- Renvoie:
Identifiant unique de l’exécution créée.
- Type renvoyé:
- _update_lieux_batch(session, df_csv)[source]
Met à jour en batch les enregistrements de lieux dans la base de données à partir du df issu de CSV_URL_HORAIRES.
Pour chaque ligne du DataFrame, extrait les informations du lieu (identifiant, nom, type_lieu, url, horaires_data_gl) et prépare les données pour une insertion ou mise à jour efficace dans la table “Lieux”.
Utilise une opération “merge” (insert avec gestion des conflits) pour insérer les nouveaux lieux ou mettre à jour ceux existants selon l’identifiant.
Log le nombre d’enregistrements mis à jour.
- Paramètres:
session (Session) – Session SQLAlchemy active pour effectuer les opérations sur la base de données.
df_csv (DataFrame) – DataFrame contenant les données des lieux à insérer ou mettre à jour.
- _update_horaires_lieux_depuis_gl(session)[source]
Met à jour les horaires des lieux à partir des fichiers de référence CSV issus de data.grandlyon.com : CSV_URL_PISCINES, CSV_URL_MAIRIES, CSV_URL_MEDIATHEQUES.
- Pour chaque fichier de référence :
Charge les données CSV en utilisant CSVToPolars.
Parcourt chaque ligne pour trouver le lieu correspondant dans la base de données via son identifiant.
- Si le lieu existe et que des horaires sont présents :
Met à jour le champ “horaires_data_gl” du lieu.
Si le convertisseur OSM est disponible, tente de convertir les horaires au format JSON enrichi et met à jour le champ “horaires_data_gl_json”.
Enregistre les modifications dans la base de données.
Journalise le nombre de lieux mis à jour ou les erreurs rencontrées.
- Paramètres:
session (Session) – Session SQLAlchemy utilisée pour accéder et modifier les objets de la base de données.
- Exceptions:
Journalise les erreurs lors du chargement des fichiers CSV ou de la conversion des horaires.
Si le convertisseur OSM n’est pas disponible, journalise une erreur et ignore la conversion JSON.
- _create_execution(session) int [source]
Crée une nouvelle exécution et retourne son ID.
- Paramètres:
session (Session) – Session SQLAlchemy active.
- Renvoie:
Identifiant unique de l’exécution créée.
- Type renvoyé:
- _setup_resultats_extraction(session, df_csv, execution_id)[source]
Prépare et gère les enregistrements de résultats d’extraction pour une nouvelle exécution.
Cette méthode effectue les opérations suivantes : - Vérifie la présence d’enregistrements incomplets (statut URL, extraction LLM, conversion OSM) issus d’exécutions précédentes,
et les assigne à l’exécution courante pour reprise.
Classe les enregistrements incomplets selon le type de données manquantes (URL, LLM, OSM) et journalise le nombre d’actions à effectuer pour chaque catégorie.
Crée de nouveaux enregistrements de résultats d’extraction pour les lieux présents dans le fichier CSV_URL_HORAIRES, mais absents de la base de données, en initialisant les champs nécessaires.
- Paramètres:
session (Session) – Session SQLAlchemy active pour les opérations sur la base de données.
df_csv (DataFrame) – Fichier CSV des lieux à traiter.
execution_id (int) – Identifiant unique de l’exécution en cours.
- get_pending_urls(execution_id) Sequence[Row[Tuple[ResultatsExtraction, Lieux]]] [source]
Récupère les URLs en attente de traitement.
- get_pending_llm(execution_id) Sequence[Row[Tuple[ResultatsExtraction, Lieux]]] [source]
Récupère les enregistrements en attente d’extraction LLM.
- get_results_with_raw_markdown(execution_id: int) Sequence[ResultatsExtraction] [source]
Récupère les résultats avec markdown brut à nettoyer.
- get_results_with_cleaned_markdown(execution_id: int) Sequence[ResultatsExtraction] [source]
Récupère les résultats avec markdown nettoyé à filtrer.
- get_results_with_schedules() Sequence[ResultatsExtraction] [source]
Récupère les résultats avec horaires extraits pour comparaison.
- get_pending_comparisons() Sequence[Row[Tuple[ResultatsExtraction, Lieux]]] [source]
Récupère les enregistrements en attente de comparaison.
- update_comparison_result(resultat_id: int, comparison_data: dict)[source]
Met à jour le résultat d’une comparaison.
- update_url_result(resultat_id: int, result_data: dict)[source]
Met à jour le résultat d’une extraction URL.
- update_cleaned_markdown(resultat_id: int, cleaned_markdown: str)[source]
Met à jour le markdown nettoyé.
- update_filtered_markdown(resultat_id: int, filtered_markdown: str, co2_emissions: float = 0.0)[source]
Met à jour le markdown filtré et ajoute les émissions CO2.
- update_llm_result(resultat_id: int, llm_data: dict)[source]
Met à jour le résultat d’une extraction LLM.
- update_execution_emissions(execution_id: int, total_emissions: float)[source]
Met à jour les émissions CO2 totales d’une exécution en les ajoutant au total existant.
- update_execution_embeddings(execution_id: int, embeddings_emissions: float)[source]
Met à jour les émissions CO2 des embeddings d’une exécution.