Translation Notice

This is an automatically translated version of that Article. Despite my best efforts, it might not be perfect.
Native speakers are welcome to open pull requests to correct anything that doesn't sound right.

Analyse de données dans les systèmes transactionnels distribués

Analyse de données dans les systèmes transactionnels distribués

Date de publication August 12, 2025 00:00
analyse de données entrepôt de données ETL traitement de données systèmes transactionnels

Dans cet article, je vais aborder le problème de l'analyse de données dans les systèmes transactionnels distribués.
Si vous cherchez des idées pour construire un entrepôt de données central qui permettra de collecter les données de l'ensemble du système, indépendamment de sa fragmentation, tout en évitant de vous noyer dans les coûts opérationnels, alors cet article est fait pour vous.

Tout commence innocemment

La plupart des systèmes que nous créons au quotidien stockent des données dans une base de données relationnelle. Un choix très populaire et d'ailleurs excellent est PostgreSQL, qui est devenu ces dernières années presque un standard dans l'industrie.

L'histoire de la plupart des projets se ressemble généralement : on commence par valider une idée, on gagne les premiers utilisateurs, le système commence à générer des revenus, le business cherche à augmenter les profits, de nouvelles fonctionnalités voient le jour. Chaque nouvelle fonctionnalité, c'est quelques nouvelles tables dans la base de données.

Pour accélérer le développement, nous utilisons un ORM, générons automatiquement des migrations qui créent et mettent à jour le schéma de la base de données.

Au début, tout se passe bien, les nouvelles fonctionnalités apportent les profits attendus, le business commence à se développer. On embauche plus de développeurs pour créer plus de fonctionnalités en parallèle.

De temps en temps, quelqu'un signale que le système commence à "ramer" à certains endroits, reconnaissance rapide, diagnostic encore plus rapide, il manque un index dans une table.

Dans la configuration des mappings ORM, on ajoute un index sur le champ sur lequel le système recherche très souvent des données. Problème résolu.

L'équipe grandissante de développeurs accorde une grande importance à la qualité, peut-être même utilise-t-elle des techniques avancées de développement logiciel, comme l'Event Storming ou le Domain-Driven Design.
Le CI/CD exécute d'innombrables tests, s'assurant que les changements n'introduisent pas de régressions.

L'idylle dure, l'équipe ou peut-être plusieurs équipes commencent à ajouter de nouveaux modules au système. Modules correctement isolés, responsables de tâches spécifiques, ne dépassant jamais leurs limites et n'empiétant pas sur les compétences d'autres modules.

Pour la communication, on utilise évidemment des files d'attente, on implémente le Pattern Outbox/Inbox

Pour assurer une isolation appropriée, nous établissons des règles stipulant que chaque module n'a accès qu'aux tables de la base de données qui lui appartiennent. Pour obtenir des données d'un autre module, il faut s'adresser à ce module, que ce soit via une API interne ou de toute autre manière.

De temps en temps, le business vient nous voir avec la question pouvez-vous nous générer rapidement ce rapport ?. Bien sûr, quelques lignes en SQL, peut-être quelques dizaines et le rapport est prêt.

Le business est satisfait, le rapport au format CSV part vers Excel (l'outil BI le plus populaire), le business tire des conclusions, planifie de nouvelles fonctionnalités et changements.

Business Intelligence

Le temps passe, les nouvelles tables poussent comme des champignons après la pluie

Dans cet état, nous pouvons continuer très longtemps, même plusieurs bonnes années.

Entre-temps, quelqu'un quelque part aura certainement l'idée d'ajouter au système la possibilité de générer des rapports. Ce n'est qu'une question de temps.

Les rapports sur l'état du système sont pour le business l'un des outils les plus importants donnant un aperçu des comportements, préférences ou tendances des utilisateurs. Ils permettent non seulement de comprendre ce qui se passe, mais aussi de planifier correctement ce qui doit encore arriver.

Plus les rapports sont bons et détaillés, meilleures sont les décisions qui peuvent être prises sur leur base. De bonnes décisions commerciales se traduisent par des profits plus importants, des profits plus importants se traduisent par un budget plus conséquent. Un budget plus important se traduit par de meilleurs outils, des équipes plus grandes, de meilleurs salaires ou primes.

Il devrait donc être dans l'intérêt de chaque développeur de fournir au business les meilleures données possibles et les plus précises, après tout, de meilleurs résultats se traduisent directement par de meilleurs profits.

Premiers symptômes

Le système fonctionne, génère des profits. Il se compose d'environ 5, peut-être même 10 modules, chaque module comprend 20-50 tables dans la base de données. Chaque module fournit ses propres rapports.

Chaque module ne fournit qu'une partie des données, une fraction d'une image plus grande, mais aucun ne donne une vue d'ensemble.

Les équipes ont certes implémenté des clés référentielles vers des données provenant d'autres modules, elles ont même réussi dans l'interface utilisateur à créer un endroit unique d'où l'on peut générer des rapports.

Mais c'est encore insuffisant...

Très vite, il s'avère que les rapports générés dans différents modules, écrits par différents développeurs, peut-être même dans différentes technologies ont des formats de données différents, des standards de nommage différents.

Les plages de dates sont interprétées différemment, un module inclut les dates de fin et de début, un autre les exclut, et encore un autre fait un intervalle ouvert à droite pour faciliter la pagination, parce qu'ils ont justement aussi une API et cette API utilise le même morceau de code.

Comme chaque module est indépendant, possède ses frontières, sa nomenclature, à un moment donné nous réalisons que ce que nous appelons d'une certaine manière dans un module, un autre module l'expose sous un nom complètement différent. Parce que dans le contexte de ce module, cela a du sens.

Avec le temps, nous réaliserons probablement aussi que chaque équipe a défini différemment sa politique de rétention et de stockage des données. Malgré la possession de données des 5 dernières années dans le module clé, nous ne pouvons rien en faire, car les modules qui fournissaient les données nécessaires pour enrichir le rapport de base ne possèdent des données que des 2 dernières années.

Ce ne sont cependant pas des problèmes qu'un peu de magie dans Excel ne pourrait résoudre (peut-être à part les lacunes dans les données). Nous changerons les noms de ces colonnes, supprimerons celles-ci, ajouterons un filtre rapide et c'est suffisant.

Nous créerons un grand fichier dans lequel nous aurons une feuille nommée "Dashboard", et toutes les autres seront en lecture seule, alimentant le dashboard.

Peut-être que cette approche fonctionnera même un moment. Peut-être même plus qu'un moment, mais ne nous faisons pas d'illusions. Tout cela finira par s'effondrer, et conformément aux lois de Murphy cela s'effondrera au pire moment possible.

Qu'est-ce qui ne va pas avec Excel ?

Rien ! Excel est un outil formidable. Le problème ne réside pas dans Excel, mais dans son utilisation.

Toute cette magie consistant à nettoyer et préparer les données ne devrait pas avoir lieu dans Excel, pas à grande échelle. Si nous parlons d'un rapport rapide ponctuel, pas de problème. Nous faisons ce que nous devons, tapons les formules, analysons les données et oublions.

Cependant, si cela doit faire partie de notre routine quotidienne, si nous devons cycliquement passer par le même processus, en suivant les changements constants et l'évolution du système, tôt ou tard il s'avérera que ces feuilles de calcul sont obsolètes.

Les colonnes ont cessé d'exister ou ont changé de nom, de nouvelles colonnes sont apparues, le format des données a changé ou pire encore, l'une des équipes s'occupant d'un des modules a supprimé certaines données sans savoir qu'elles étaient utilisées par un utilisateur métier quelque part dans l'un de ses rapports qu'il ouvre une fois par trimestre.

Sur le long terme, les feuilles de calcul plus complexes, qui puisent des données dans des rapports générés automatiquement par le système, qui sont ensuite assemblées sur la base de règles non explicites, sont impossibles à maintenir.

Et si on connectait un outil BI ?

Ont pensé de nombreux développeurs qui ont rencontré à plusieurs reprises le problème de génération de rapports.

Prenons par exemple Metabase. Un outil gratuit que nous pouvons mettre en place en quelques minutes avec Docker.

Lui donner accès à notre base de données et à quelques ou toutes les tables, et grâce à une interface utilisateur très conviviale, le business pourra générer très facilement et agréablement les rapports les plus complexes.

Des rapports qui pourront contenir des données de plusieurs modules simultanément !

Nous pouvons même embaucher un analyste de données avec des bases en SQL, qui tout ce qui ne pourra pas être cliqué, l'obtiendra grâce à une requête bien préparée.

Sauf que cela ne résout pas le problème

Cela ne fait que le repousser dans le temps.

Si nous regardons attentivement ce qui a changé, une seule chose a changé. L'outil... Nous avons transféré le problème de nettoyage et de liaison des données d'Excel à Metabase.

Excel est certes revenu à son rôle initial, nous pouvons maintenant importer les rapports téléchargés depuis Metabase dans Excel.

Cependant, notre logique implicite de liaison/nettoyage des données s'est déplacée de la feuille de calcul vers les requêtes SQL.

De plus, tous les problèmes sont restés les mêmes :

Et si on établissait des processus et des règles ?

La plupart des problèmes ci-dessus peuvent être résolus en implémentant des processus et des règles appropriés.

Nous pouvons établir des standards de nommage stipulant que chaque table dans la base doit contenir dans son nom le préfixe du module, et les colonnes sont nommées en minuscules et séparées par des underscores.

Nous pouvons établir que chaque module stocke les données des 5 dernières années (hot storage), tout ce qui est plus ancien est archivé. (cold storage)

Nous pouvons établir que les plages de dates sont toujours traitées comme des intervalles ouverts à droite.

Nous pouvons établir que nous ne supprimons aucune colonne de la base de données, ou qu'avant de supprimer quoi que ce soit, nous entrons d'abord dans une période de transition, pendant laquelle nous montrons à chaque utilisateur du système quelles colonnes vont changer et comment.

Même si nous supposons pour les besoins de la discussion qu'il sera possible d'implémenter ces processus globalement entre plusieurs équipes, et que ces équipes les respecteront strictement et très précisément, ce ne sera pas suffisant...

La mise à l'échelle de la base de données n'est pas bon marché

Surtout si nous nous appuyons sur des solutions cloud.

Imaginons une situation où, aux heures de pointe du système (quand les utilisateurs génèrent le plus de transactions), un analyste métier, qui travaille selon son propre planning, doit générer un rapport basé sur un SQL typique de plusieurs milliers de lignes ?

L'analyste lance la requête, la base de données commence à mouliner. La requête dure 5, 10, 15 minutes. La base de données commence à transpirer.

Les utilisateurs bombardent le système de nouvelles commandes (ou toute autre opération qui génère beaucoup d'écritures) pendant que l'analyste attend les résultats.

Au même moment, quelqu'un du business a besoin de vérifier rapidement quelques rapports, chacun d'eux contient le "nombre total de lignes dans la table". Il y a plusieurs personnes comme ça.

Toutes ces opérations se superposent, notre base de données déjà très chargée ne suit plus.

Certaines transactions utilisateur n'aboutissent pas.
Le système respire à peine. Le temps d'attente pour les opérations les plus basiques se mesure en secondes.

Et maintenant la cerise sur le gâteau, quand toutes ces scènes dantesques ont lieu, quand Pager Duty est chauffé au rouge par toutes sortes d'incidents, quand les équipes paniquent en essayant de ramener le système à la vie, les devops cherchent comment mettre rapidement à l'échelle la base de données...

Travaux de rénovation

Le PDG commence une présentation pour un partenaire commercial potentiel, avec qui la collaboration doit s'avérer cruciale dans la stratégie de développement de l'entreprise...

Et si on mettait simplement en place une réplique ?

Après tout, les rapports ne surchargeront pas notre base de données transactionnelle.

Nous doublerons certes les coûts de maintenance de la base de données, mais nous réduirons le risque de surcharge du système et nous pourrons connecter notre outil de business intelligence préféré directement à la réplique, ce qui nous donnera des données en temps réel.

Ça sonne génial, mais en pratique ce n'est pas si simple.

En mettant de côté les problèmes potentiels découlant de la nature même de la réplication, le problème principal et fondamental que je rencontre le plus souvent, c'est la perception.

Un développeur qui a généré ces tables via des mappings ORM regardera les tables de la base de données très différemment d'un analyste de données.

Le développeur saura quelles tables doivent être jointes ensemble pour obtenir une image complète. Il comprendra les limitations et conditions cachées quelque part dans le code de l'application. Surtout, le développeur connaît ou devrait au moins avoir une idée du cycle de vie du système (de ses données).

Toute cette connaissance n'est généralement pas disponible pour les analystes.

C'est comme dire à quelqu'un de regarder quelque chose par le trou de la serrure. On peut certainement voir quelque chose. On peut tirer certaines conclusions, mais il sera très difficile de reconstruire l'ensemble.

Il suffit que nous ayons dans la base de données une colonne de type JSONB dans laquelle nous stockons certaines structures de données. Supposons que le système autorise 3 combinaisons correctes de la même structure, mais l'une est super rare, si rare qu'elle n'est pas encore apparue dans le système. En regardant les données, même dans leur ensemble, l'analyste ne peut simplement pas savoir qu'il existe 3 combinaisons d'une structure. Lors de la normalisation, il prendra en compte 2 cas, tandis que le troisième deviendra une bombe à retardement qui explosera comme toujours au moment le moins attendu.

En d'autres termes, si nous avons dans le système plusieurs modules indépendants. Chacun avec sa base de données, ou au moins ses tables dans la base. Ce qui nous donne au total 200-300 tables, s'attendre à ce que l'analyste gère cela sans problème, ne commette pas d'erreurs et que les rapports ne s'écartent pas des attentes, est pour le moins naïf.

Malgré tout, exposer une copie/réplique de la base de données pour les analystes et lui donner un nom à 4 lettres dérivé du mot "analytics" est encore largement pratiqué.

Les outils BI rivalisent pour créer la meilleure interface utilisateur, grâce à laquelle les rapports peuvent être cliqués. Ils promettent que nous pourrons analyser les données sans SQL.

Oui, cela peut fonctionner, dans de nombreux endroits c'est exactement comme ça que ça fonctionne. Mais ce dont nous ne parlons pas ouvertement, c'est :

Ce qui se répercute sur la qualité des données et l'efficacité de la prise de décisions commerciales.

Que nous reste-t-il ?

Peut-être commençons par établir quels problèmes nous voulons résoudre en priorité :

Stratégie et analyse
  1. L'analyse des données / génération de rapports ne doit avoir aucun impact sur le fonctionnement du système.
  2. Les données dans les rapports doivent toujours être fraîches (un délai dans les données est acceptable, défini individuellement)
  3. Les rapports doivent refléter l'état réel, non déformé du système
  4. La structure des données doit être résistante à la régression
  5. Politique cohérente de rétention et d'archivage des données

1) Séparation des ressources

Ce n'est rien de révolutionnaire, si nous ne voulons pas que notre système soit exposé aux surcharges résultant de l'abus de la base de données par la génération de rapports, nous devons mettre en place une base de données séparée.

Quelle base choisir pour l'analytique ?

C'est en fait un sujet pour un article séparé ou même une série d'articles. Il y a beaucoup de solutions, certaines meilleures, d'autres moins bonnes. Il n'existe pas de solution magique unique pour tous les problèmes.

Mon conseil, surtout pour les petites équipes, inexpérimentées dans la gestion des données, est de ne pas se jeter sur des technologies avec lesquelles nous n'avons pas d'expérience.

Le format approprié des données est crucial. Après avoir transformé de nombreuses tables étroites en une seule large, il s'avérera probablement que générer le même rapport sans utiliser 20x JOIN ne prend plus 10 minutes mais moins d'une demi-seconde.

Et si le problème ce sont les agrégations, et non les jointures ?

Alors, au lieu d'agréger à la volée, il vaut mieux préparer une table contenant ces données sous forme agrégée, et non brute.

2) Données fraîches

Bon d'accord, mais si nous créons une nouvelle base de données indépendante, comment nous assurerons-nous que les données dans cette base sont fraîches et à jour ?

Ici, beaucoup dépend du délai acceptable dans la synchronisation des données. Le plus souvent, il suffit que la base analytique soit environ 24 heures derrière la base transactionnelle. C'est-à-dire qu'elle contient des données jusqu'à "hier", en incluant tout "hier".

Pourquoi ? Parce que peu de décisions commerciales sont prises dans l'instant. Si certaines décisions doivent être prises dans un délai aussi court, alors on construit les automatisations appropriées.

Si un délai de 24 heures est acceptable (parfois ce n'est pas le cas et il y a aussi des solutions pour cela), il suffit que nous effectuions la synchronisation plusieurs fois par jour. Bien sûr, ici non plus il n'y a pas de règle d'or. De même qu'il n'y a pas de règle disant quelle taille de plage synchroniser à la fois.

Il y a cependant une bonne pratique qui facilite la synchronisation. Elle consiste à s'assurer que les tables principales dans le système transactionnel contiennent la date de création/modification de l'enregistrement.

En possédant ces deux informations, nous sommes en mesure de réduire la fenêtre de synchronisation à une période de temps spécifique.

À quoi cela ressemble-t-il en pratique ? Nous pouvons par exemple lancer le processus de synchronisation toutes les 6 heures, en collectant uniquement les enregistrements modifiés au cours des dernières 24 heures.
Bien sûr, ce sont des chiffres d'exemple, ces valeurs doivent être établies sur la base de la taille et du comportement des données.

Pourquoi 24 heures ? C'est une sécurité supplémentaire. Nous pourrions récupérer les données seulement des 7 dernières heures, mais si pour une raison quelconque la synchronisation ne s'exécute pas et que nous ne le détectons pas, nous pouvons perdre des données.

3) Reflet de l'état du système

Mon opinion sur ce sujet peut sembler controversée, mais je pense que la meilleure connaissance des données et du comportement du système ou du module appartient à l'équipe qui construit ce système/module.

C'est précisément cette équipe qui devrait être responsable de s'assurer que les données générées par le système ou sa partie dont l'équipe donnée est responsable, arrivent dans le référentiel central de données.

En d'autres termes, c'est l'équipe implémentant une fonctionnalité donnée qui devrait, sur la base des exigences collectées précédemment, transformer ces données au format approprié et les pousser plus loin.

C'est probablement le moyen le plus simple de s'assurer que les données sont complètes et que les développeurs de l'équipe donnée sont conscients que ces données sont utilisées quelque part. Le format des données analytiques devient pour eux une sorte de contrat – un contrat qu'ils doivent respecter.

Ce n'est pas très différent d'un contrat sur le schéma API.

4) Résistance à la régression

Ce point est probablement le plus compliqué. L'implémentation correcte de l'évolution du schéma de données est souvent non pas tant difficile que fastidieuse.

En résumé, les règles ressemblent à ceci :

Alors ne pouvons-nous rien supprimer ?

Si, mais pas n'importe comment. En général, comment et à quelle fréquence nous briserons la rétrocompatibilité ne dépend que de nous.

Si nous utilisons notre source de données analytiques uniquement en interne et, disons, l'analyste s'occupant de la construction des rapports est au courant des changements dans le système, avec une coordination appropriée, nous pourrions ajouter de nouvelles tables, puis supprimer les anciennes, en lui donnant un moment pour mettre à jour les rapports.

Cependant, si notre source de données analytiques est utilisée pour la Data Science, mais que nous travaillons dans un environnement multi-tenant et que les données analytiques/rapports sont mis à disposition des clients, alors nous devons aborder la question complètement différemment.

Politique de stockage et d'archivage des données

Comme je l'ai mentionné plus haut, il est très important que les données dans la base analytique, en particulier celles fournies par différents modules, soient soumises aux mêmes règles concernant le temps de stockage.

Si nous ne conservons les états de stock dans le système que de la dernière année, et les commandes des 5 dernières années, les analystes ne pourront pas construire un rapport qui contiendra des données de ces deux sources.

C'est plus un problème de nature formelle que technique. Il semblerait qu'il suffit de simplement se mettre d'accord, cependant en pratique ce n'est pas si simple.

Pour établir une politique commune de stockage et d'archivage des données, il faut prendre en compte non seulement les aspects techniques, mais aussi juridiques, commerciaux ou justement analytiques, ce qui peut nécessiter des compromis.

Exemples

Regardons maintenant un exemple simple de processus ETL, dont la tâche est de transférer des données de la base transactionnelle vers la base analytique.

Dans cet exemple, j'utiliserai Flow PHP, ce n'est cependant pas quelque chose de particulièrement unique à PHP. Dans n'importe quel langage, nous pouvons construire quelque chose de très similaire en utilisant n'importe quelle bibliothèque facilitant la création d'applications CLI et un outil de traitement de données.

L'exemple ci-dessous (sous une forme légèrement modifiée) provient d'une session de live stream que j'ai eu le plaisir d'enregistrer avec Roland qui anime la chaîne Never Code Alone. Vous trouverez le matériel vidéo sur YouTube sous le terme "Flow PHP"

Supposons que le format des commandes ressemble à peu près à ceci :

schema
|-- order_id: ?uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- address: map<string, string>
|-- notes: list<string>
|-- items: list<structure{sku: string, quantity: integer, price: float}>

Notre objectif est de transférer ces commandes vers la base de données analytique, préparons donc le schéma des données d'entrée et de destination.

<?php

declare(strict_types=1);

namespace App\DataFrames;

use Flow\ETL\Adapter\Doctrine\DbalMetadata;
use function Flow\ETL\DSL\integer_schema;
use function Flow\ETL\DSL\uuid_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\string_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\Types\DSL\type_map;
use function Flow\Types\DSL\type_string;
use function Flow\ETL\DSL\list_schema;
use function Flow\Types\DSL\type_list;
use function Flow\Types\DSL\type_structure;
use function Flow\Types\DSL\type_integer;
use function Flow\Types\DSL\type_float;
use function \Flow\ETL\DSL\schema;
use Flow\ETL\Schema;

final class Orders
{
    public static function sourceSchema() : Schema
    {
        return schema(
            uuid_schema("order_id"),
            uuid_schema("seller_id"),
            datetime_schema("created_at"),
            datetime_schema("updated_at", nullable: true),
            datetime_schema("cancelled_at", nullable: true),
            float_schema("discount", nullable: true),
            string_schema("email"),
            string_schema("customer"),
            map_schema("address", type: type_map(key_type: type_string(), value_type: type_string())),
            list_schema("notes", type: type_list(element: type_string())),
            list_schema("items", type: type_list(element: type_structure(elements: ["item_id" => type_string(), "sku" => type_string(), "quantity" => type_integer(), "price" => type_float()]))),
        );
    }

    public static function destinationSchema() : Schema
    {
        return self::sourceSchema()
            ->replace('updated_at', datetime_schema("updated_at"))
            ->remove('address')
            ->add(
                string_schema('street', metadata: DbalMetadata::length(2048)),
                string_schema('city', metadata: DbalMetadata::length(512)),
                string_schema('zip', metadata: DbalMetadata::length(32)),
                string_schema('country', metadata: DbalMetadata::length(128)),
            )
            ->remove('items')
            ->add(
                uuid_schema('item_id', metadata: DbalMetadata::primaryKey()),
                string_schema('sku', metadata: DbalMetadata::length(64)),
                integer_schema('quantity'),
                integer_schema('price'),
                string_schema('currency', metadata: DbalMetadata::length(3)),
            )
            ;
    }
}

Notez que la structure de la table de destination n'est plus orientée sur les commandes, mais sur les articles commandés. Notre objectif est de décompresser les articles des commandes de sorte que chacun soit une ligne séparée.

Grâce à cela, l'analyste qui devra générer un rapport n'aura plus besoin de bricoler et décompresser le json à la volée.

La colonne Adresse a également été divisée en plusieurs colonnes, grâce à quoi le rapport pourra être plus facilement filtré.

Une autre transformation importante est le changement de price de float à int en multipliant la valeur à virgule flottante par 100.

Le dernier changement sera l'ajout d'informations sur la devise dans laquelle les prix sont donnés. Mais d'où vient cette information ? C'est un détail très important résultant d'une implémentation pas très bonne. Dans ce cas particulier, toutes les commandes sont en dollars. Le système le sait, les développeurs le savent, mais une personne regardant les tables dans la base sans contexte peut ne pas avoir cette connaissance.

Notre structure de destination devrait ressembler à peu près à ceci :

schema
|-- order_id: uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- notes: list<string>
|-- street: string
|-- city: string
|-- zip: string
|-- country: string
|-- item_id: uuid
|-- sku: string
|-- quantity: integer
|-- price: integer
|-- currency: string

L'étape suivante sera de créer la table appropriée dans la base analytique. Nous pouvons y parvenir relativement facilement grâce à l'adaptateur pour Doctrine DBAL.

<?php

declare(strict_types=1);

namespace App\Dbal;

use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\Provider\SchemaProvider as MigrationsSchemaProvider;
use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;

final class SchemaProvider implements MigrationsSchemaProvider
{
    public const ANALYTICAL_ORDER_LINE_ITEMS = 'order_line_items';

    public function createSchema(): Schema
    {
        return new Schema(
            tables: [
                to_dbal_schema_table(Orders::destinationSchema(), self::ANALYTICAL_ORDER_LINE_ITEMS),
            ]
        );
    }
}

Dans la base analytique, nous stockerons donc une version "simplifiée" ou "normalisée" de la table des commandes. La normalisation consiste à décompresser les articles de la commande et à en faire des lignes séparées ainsi qu'à diviser la colonne "Adresse" en plusieurs colonnes.

Examinons donc la commande CLI qui sera responsable du transfert des données de la base transactionnelle vers la base analytique.

<?php

namespace App\Command;

use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use App\Dbal\SchemaProvider;
use Doctrine\DBAL\Connection;
use Flow\Doctrine\Bulk\Dialect\SqliteInsertOptions;
use Flow\ETL\Rows;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function Flow\ETL\Adapter\Doctrine\from_dbal_key_set_qb;
use function Flow\ETL\Adapter\Doctrine\pagination_key_desc;
use function Flow\ETL\Adapter\Doctrine\pagination_key_set;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\analyze;
use function Flow\ETL\DSL\constraint_unique;
use function Flow\ETL\DSL\data_frame;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\rename_replace;
use function Flow\ETL\DSL\schema_to_ascii;
use function Flow\Types\DSL\type_datetime;

#[AsCommand(
    name: 'app:orders:import',
    description: 'Import orders from the transactional database to the analytical database.',
)]
class OrdersImportCommand extends Command
{
    public function __construct(
        private readonly Connection $transactional,
        private readonly Connection $analytical,
    )
    {
        parent::__construct();
    }

    protected function configure()
    {
        $this->addOption('start-date', null, InputOption::VALUE_REQUIRED, 'Start date for the data pull.', '-24 hours')
            ->addOption('end-date', null, InputOption::VALUE_REQUIRED, 'End date for the data pull.', 'now')
        ;
    }


    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $io->title('Importing orders');

        $startDate = type_datetime()->cast($input->getOption('start-date'));
        $endDate = type_datetime()->cast($input->getOption('end-date'));

        $io->progressStart();

        $report = data_frame()
            ->read(
                from_dbal_key_set_qb(
                    $this->transactional,
                    $this->transactional->createQueryBuilder()
                        ->select('*')
                        ->from(SchemaProvider::ORDERS)
                        ->where('updated_at BETWEEN :start_date AND :end_date')
                        ->setParameter('start_date', $startDate->format('Y-m-d H:i:s'))
                        ->setParameter('end_date', $endDate->format('Y-m-d H:i:s')),
                    pagination_key_set(
                        pagination_key_desc('updated_at'),
                        pagination_key_desc('order_id')
                    )
                )->withSchema(Orders::sourceSchema())
            )
            ->withEntry('_address', ref('address')->unpack())
            ->renameEach(rename_replace('_address.', ''))
            ->withEntry('_item', ref('items')->expand())
            ->withEntry('_item', ref('_item')->unpack())
            ->renameEach(rename_replace('_item.', ''))
            ->drop('_item', 'items', 'address')
            ->withEntry('currency', lit('USD'))
            ->withEntry('price', ref('price')->multiply(100))
            ->constrain(constraint_unique('item_id'))
            ->match(Orders::destinationSchema())
            ->write(
                to_dbal_table_insert(
                    $this->analytical,
                    SchemaProvider::ORDER_LINE_ITEMS,
                    SqliteInsertOptions::fromArray([
                        'conflict_columns' => ['item_id'],
                    ])
                )
            )
            ->run(function (Rows $rows) use ($io) {
                $io->progressAdvance($rows->count());
            }, analyze: analyze())
        ;

        $io->progressFinish();

        $io->newLine();

        $io->definitionList(
            'Orders Import Summary',
            new TableSeparator(),
            ['Execution time ' => \number_format($report->statistics()->executionTime->highResolutionTime->seconds) . ' seconds'],
            ['Memory usage ' => \number_format($report->statistics()->memory->max()->inMb()) . ' MB'],
            ['Rows inserted ' => \number_format($report->statistics()->totalRows())],
        );

        return Command::SUCCESS;
    }
}
Bien sûr, ce n'est pas la forme la plus belle ni même la plus correcte. Normalement, la commande CLI ne contiendrait pas la définition du pipeline ETL, mais pour les besoins de l'exemple, c'est un bon début.

Un entrepôt de données central dédié est sans aucun doute une option tentante, surtout dans les endroits où le manque de visibilité empêche une prise de décision efficace.

Heureusement, c'est le type de fonctionnalité qui peut être ajouté à pratiquement n'importe quelle étape de la vie du projet.

Cela peut nécessiter l'introduction de processus supplémentaires et d'une certaine discipline de la part des équipes, mais les avantages d'une telle solution sont énormes.

Bien sûr, comme tout ce qui est nouveau, de tels changements peuvent sembler difficiles à introduire. Le manque d'expérience dans le travail avec les données, au moins dans les premières étapes, fait que cette tâche peut sembler tout simplement impossible.

Cependant, mon expérience montre que le plus difficile est de commencer, quand nous avons déjà :

Le travail avance comme une machine.

Il faut cependant se rappeler qu'il n'existe pas de solution universelle unique qui conviendra à chaque système. Dans chaque cas, il faut adapter l'approche à la spécificité du système et de l'organisation donnés.

Comment commencer ?

Si vous avez besoin d'aide pour construire un entrepôt de données central, je serai heureux de vous aider.
Contactez-moi, et ensemble nous créerons une solution parfaitement adaptée à vos besoins.

Je vous encourage également à visiter le serveur Discord - Flow PHP, où nous pouvons discuter directement.

Consultations