Translation Notice

This is an automatically translated version of that Article. Despite my best efforts, it might not be perfect.
Native speakers are welcome to open pull requests to correct anything that doesn't sound right.

Datenanalyse in verteilten Transaktionssystemen

Datenanalyse in verteilten Transaktionssystemen

Veröffentlicht am August 12, 2025 00:00
Datenanalyse Data Warehouse ETL Datenverarbeitung Transaktionssysteme

In diesem Artikel möchte ich das Problem der Datenanalyse in verteilten Transaktionssystemen angehen.
Wenn Sie nach Ideen für den Aufbau eines zentralen Data Warehouse suchen, das Ihnen ermöglicht, Daten aus dem gesamten System zu sammeln, unabhängig von seiner Fragmentierung und ohne in den Betriebskosten zu ertrinken, dann ist dieser Artikel für Sie.

Alles beginnt unschuldig

Die meisten Systeme, die wir täglich erstellen, speichern Daten in irgendeiner relationalen Datenbank. Eine sehr beliebte und gleichzeitig gute Wahl ist PostgreSQL, das in den letzten Jahren zu einem nahezu Standard in der Branche geworden ist.

Die Geschichte der meisten Projekte verläuft meist sehr ähnlich: Wir beginnen mit der Verifizierung der Idee, gewinnen unsere ersten Benutzer, das System beginnt Geld zu verdienen, das Business überlegt, wie man die Gewinne steigern kann, neue Funktionalitäten entstehen. Jede neue Funktionalität bedeutet einige neue Tabellen in der Datenbank.

Um die Entwicklung zu beschleunigen, verwenden wir ein ORM, generieren automatisch Migrationen, die das Datenbankschema erstellen und aktualisieren.

Anfangs läuft alles glatt, neue Funktionalitäten bringen die erwarteten Gewinne, das Business beginnt zu skalieren. Wir stellen mehr Programmierer ein, um mehr Funktionalitäten parallel zu entwickeln.

Von Zeit zu Zeit meldet jemand, dass das System an manchen Stellen anfängt zu "hängen", schnelle Aufklärung, noch schnellere Diagnose, es fehlt ein Index in irgendeiner Tabelle.

In der ORM-Mapping-Konfiguration fügen wir einen Index für das Feld hinzu, nach dem das System sehr häufig Daten sucht. Problem gelöst.

Das wachsende Entwicklerteam legt großen Wert auf Qualität, vielleicht verwendet es sogar fortgeschrittene Softwareentwicklungstechniken wie Event Storming oder Domain-Driven Design.
CI/CD führt unzählige Tests aus und stellt sicher, dass Änderungen keine Regressionen einführen.

Die Idylle dauert an, das Team oder vielleicht mehrere Teams beginnen, neue Module zum System hinzuzufügen. Module, die angemessen isoliert sind, für spezifische Aufgaben verantwortlich, niemals ihre Grenzen überschreiten und nicht in die Kompetenzen anderer Module eingreifen.

Für die Kommunikation werden natürlich Warteschlangen verwendet, wir implementieren das Outbox/Inbox Pattern

Um die entsprechende Isolation zu gewährleisten, stellen wir Regeln auf, die besagen, dass jedes Modul nur Zugriff auf die Tabellen in der Datenbank hat, die zu ihm gehören. Um Daten aus einem anderen Modul zu erhalten, muss man sich an dieses Modul wenden, sei es über eine interne API oder auf andere Weise.

Gelegentlich kommt das Business zu uns mit der Frage: Könnt ihr uns schnell diesen Bericht erstellen? Natürlich, ein paar Zeilen SQL, vielleicht ein paar Dutzend und der Bericht ist fertig.

Das Business ist zufrieden, der Bericht als CSV geht zu Excel (dem beliebtesten BI-Tool), das Business zieht Schlüsse, plant neue Funktionalitäten und Änderungen.

Business Intelligence

Zeit vergeht, neue Tabellen wachsen wie Pilze nach dem Regen

In diesem Zustand können wir sehr lange verharren, sogar mehrere gute Jahre.

In der Zwischenzeit wird jemand irgendwo sicherlich auf die Idee kommen, dem System die Möglichkeit zur Berichtserstellung hinzuzufügen. Es ist nur eine Frage der Zeit.

Berichte über den Systemzustand sind für das Business eines der wichtigsten Werkzeuge, die Einblick in Verhaltensweisen, Präferenzen oder Trends der Benutzer geben. Sie ermöglichen es nicht nur zu verstehen, was passiert, sondern auch angemessen zu planen, was erst passieren soll.

Je besser und detaillierter die Berichte, desto bessere Entscheidungen können auf ihrer Grundlage getroffen werden. Gute Geschäftsentscheidungen führen zu höheren Gewinnen, höhere Gewinne führen zu einem größeren Budget. Ein größeres Budget führt zu besseren Tools, größeren Teams, besseren Gehältern oder Boni.

Im Interesse jedes Programmierers sollte es daher sein, dem Business möglichst gute und präzise Daten zu liefern, schließlich führen bessere Ergebnisse direkt zu besseren Gewinnen.

Erste Symptome

Das System funktioniert, bringt Gewinne. Es besteht aus etwa 5, vielleicht sogar 10 Modulen, jedes Modul besteht aus 20-50 Tabellen in der Datenbank. Jedes Modul liefert seine eigenen Berichte.

Jedes Modul stellt nur einen Teil der Daten zur Verfügung, einen Bruchteil des größeren Bildes, keines gibt jedoch einen Überblick über das Ganze.

Die Teams haben zwar Referenzschlüssel zu Daten aus anderen Modulen implementiert, es gelang sogar, in der Benutzeroberfläche einen Ort zu schaffen, von dem aus Berichte generiert werden können.

Das ist jedoch immer noch zu wenig...

Sehr schnell stellt sich heraus, dass Berichte, die in verschiedenen Modulen generiert werden, von verschiedenen Programmierern geschrieben, vielleicht sogar in verschiedenen Technologien, unterschiedliche Datenformate und verschiedene Namensstandards haben.

Datumsbereiche werden unterschiedlich interpretiert, ein Modul berücksichtigt End- und Anfangsdaten, ein anderes schließt sie aus, und ein anderer macht ein rechtsseitig offenes Intervall zur Erleichterung der Paginierung, weil sie zufällig auch eine API haben und diese API denselben Code-Teil verwendet.

Da jedes Modul unabhängig ist, seine eigenen Grenzen und seine eigene Nomenklatur hat, stellen wir irgendwann fest, dass das, was wir in einem Modul irgendwie nennen, ein anderes Modul unter einem völlig anderen Namen exponiert. Weil es im Kontext dieses Moduls Sinn macht.

Mit der Zeit werden wir wahrscheinlich auch feststellen, dass jedes Team unterschiedlich seine Retention- und Datenspeicherrichtlinien definiert hat. Trotz des Besitzes von Daten der letzten 5 Jahre im Schlüsselmodul können wir nichts mit ihnen anfangen, weil Module, die Daten zur Anreicherung des Grundberichts lieferten, nur Daten der letzten 2 Jahre besitzen.

Dies sind jedoch keine Probleme, die ein wenig Excel-Magie nicht lösen könnte (vielleicht abgesehen von fehlenden Daten). Wir ändern die Namen dieser Spalten, entfernen jene, fügen schnelle Filter hinzu und das reicht.

Wir erstellen eine große Datei, in der wir ein Arbeitsblatt namens "Dashboard" haben, und alle anderen sind nur lesbar und versorgen das Dashboard.

Vielleicht funktioniert dieser Ansatz sogar eine Weile. Vielleicht sogar länger als eine Weile, aber machen wir uns keine Illusionen. Das alles wird schließlich zusammenbrechen, und das gemäß Murphys Gesetz im schlimmstmöglichen Moment.

Was ist schlecht an Excel?

Nichts! Excel ist ein fantastisches Tool. Das Problem liegt nicht in Excel, sondern in seiner Verwendung.

Diese ganze Magie der Datenreinigung und -aufbereitung sollte nicht in Excel stattfinden, nicht in größerem Maßstab. Wenn wir über einen einmaligen schnellen Bericht sprechen, ist das kein Problem. Wir tun, was wir müssen, klimpern Formeln, analysieren Daten und vergessen.

Wenn dies jedoch Teil unserer täglichen Routine werden soll, wenn wir zyklisch durch denselben Prozess gehen müssen, folgend den ständigen Änderungen und der Evolution des Systems, wird sich früher oder später herausstellen, dass diese Arbeitsblätter veraltet sind.

Spalten existieren nicht mehr oder haben ihre Namen geändert, neue Spalten sind entstanden, das Datenformat hat sich geändert oder schlimmer noch, eines der Teams, das für eines der Module verantwortlich ist, hat einige Daten ohne das Bewusstsein gelöscht, dass sie von einem Geschäftsbenutzer irgendwo in einem seiner Berichte verwendet wurden, den er einmal im Quartal öffnet.

Auf lange Sicht sind komplexere Tabellenkalkulationen, die Daten aus automatisch vom System generierten Berichten beziehen, die dann basierend auf impliziten Regeln zusammengefügt werden, nicht wartbar.

Sollen wir vielleicht ein BI-Tool anschließen?

Dachten viele Programmierer, die mehrfach mit dem Problem der Berichtserstellung konfrontiert waren.

Nehmen wir zum Beispiel Metabase. Ein kostenloses Tool, das wir in wenigen Minuten mit Docker aufsetzen können.

Geben Sie ihm Zugriff auf unsere Datenbank und einige oder alle Tabellen, und über eine sehr benutzerfreundliche Oberfläche kann das Business sehr einfach und angenehm die kompliziertesten Berichte generieren.

Berichte, die Daten aus mehreren Modulen gleichzeitig enthalten können!

Wir können sogar einen Datenanalysten mit SQL-Grundkenntnissen einstellen, der alles das, was sich nicht anklicken lässt, mit entsprechend vorbereiteten Abfragen erreicht.

Nur, das löst das Problem nicht

Es verschiebt es nur in der Zeit.

Wenn wir genau hinschauen, was sich geändert hat, dann hat sich nur eine Sache geändert. Das Tool... Wir haben das Problem der Datenreinigung und -verknüpfung von Excel zu Metabase verschoben.

Excel ist zwar zu seiner ursprünglichen Rolle zurückgekehrt, wir können jetzt Berichte, die aus Metabase heruntergeladen wurden, in Excel werfen.

Jedoch ist unsere implizite Logik zur Datenverknüpfung/-reinigung von der Tabellenkalkulation zu SQL-Abfragen gewandert.

Darüber hinaus sind alle Probleme dieselben geblieben:

Sollen wir vielleicht Prozesse und Regeln einführen?

Die meisten der oben genannten Probleme lassen sich durch die Implementierung entsprechender Prozesse und Regeln lösen.

Wir können Namensstandards festlegen, die besagen, dass jede Tabelle in der Datenbank ein Modulpräfix im Namen enthalten muss, und Spalten in Kleinbuchstaben und durch Unterstriche getrennt benannt werden.

Wir können festlegen, dass jedes Modul Daten der letzten 5 Jahre speichert (Hot Storage), alles Ältere wird archiviert. (Cold Storage)

Wir können festlegen, dass Datumsbereiche immer als rechtsseitig offene Intervalle behandelt werden.

Wir können festlegen, dass wir keine Spalten aus der Datenbank entfernen, oder dass wir vor dem Entfernen von etwas zuerst in eine Übergangszeit eintreten, während der wir jedem Systembenutzer zeigen, welche Spalten sich ändern werden und wie.

Selbst wenn wir für die Diskussion annehmen, dass es gelingt, diese Prozesse global zwischen mehreren Teams zu implementieren und dass diese Teams sie bedingungslos und sehr genau befolgen werden, ist das nicht genug...

Datenbankskalierung ist nicht billig

Besonders wenn wir uns auf Cloud-Lösungen stützen.

Stellen wir uns eine Situation vor, in der während der Spitzenarbeitszeiten des Systems (wenn Benutzer die meisten Transaktionen generieren) ein Geschäftsanalyst, der nach seinem eigenen Plan arbeitet, einen Bericht basierend auf einem typischen SQL- Mehrkilozeiler generieren muss?

Der Analyst startet die Abfrage, die Datenbank beginnt zu mahlen. Die Abfrage dauert 5, 10, 15 Minuten. Die Datenbank beginnt zu schwitzen.

Benutzer bombardieren das System mit neuen Bestellungen (oder anderen Operationen, die viele Schreibvorgänge generieren), während der Analyst auf Ergebnisse wartet.

Gleichzeitig muss jemand aus dem Business schnell ein paar Berichte überprüfen, jeder enthält "die Gesamtzahl der Zeilen in der Tabelle". Es gibt mehrere solcher Personen.

All diese Operationen überlagern sich, unsere bereits sehr belastete Datenbank schafft es nicht.

Einige Benutzertransaktionen kommen nicht durch.
Das System atmet kaum. Die Wartezeit für grundlegendste Operationen wird in Sekunden gemessen.

Und jetzt das Sahnehäubchen: Wenn all diese Danteske Szenen stattfinden, wenn Pager Duty glühend heiß ist von allen Arten von Vorfällen, wenn Teams in Panik versuchen, das System wieder zum Leben zu erwecken, DevOps-Leute überlegen, wie man die Datenbank schnell skaliert...

Renovierungsarbeiten

Der CEO beginnt eine Präsentation für einen potenziellen Geschäftspartner, mit dem die Zusammenarbeit sich als entscheidend für die Unternehmensstrategie erweisen soll...

Sollen wir einfach ein Replikat aufsetzen?

Schließlich werden Berichte unsere Transaktionsdatenbank nicht überlasten.

Wir verdoppeln zwar die Datenbankunterhaltungskosten, reduzieren aber das Systemüberlastungsrisiko und können unser liebstes Business Intelligence Tool direkt an das Replikat anschließen, was uns Echtzeitdaten gibt.

Klingt fantastisch, aber in der Praxis ist es nicht so einfach.

Abgesehen von potenziellen Problemen, die sich aus der Natur der Replikation selbst ergeben, ist das Haupt- und Grundproblem, auf das ich am häufigsten stoße, die Wahrnehmung.

Völlig anders werden ein Programmierer, der diese Tabellen mit ORM-Mappings generiert hat, und ein Datenanalyst auf Tabellen in der Datenbank blicken.

Der Programmierer wird wissen, welche Tabellen miteinander verbunden werden müssen, um ein Gesamtbild zu erhalten. Er wird die Einschränkungen und Bedingungen verstehen, die irgendwo im Anwendungscode vergraben sind. Vor allem kennt der Programmierer den Lebenszyklus des Systems (seiner Daten) oder sollte sich zumindest orientieren.

All dieses Wissen ist für Analysten meist nicht verfügbar.

Es ist, als würde man jemandem sagen, er solle durch ein Schlüsselloch schauen. Etwas kann man sicherlich sehen. Einige Schlüsse lassen sich ziehen, aber es wird sehr schwer sein, das Ganze zu rekonstruieren.

Es reicht, dass wir eine JSONB-Spalte in der Datenbank haben, in der wir einige Datenstrukturen speichern. Nehmen wir an, das System erlaubt 3 gültige Kombinationen derselben Struktur, aber eine ist superselten, so selten, dass sie im System noch nicht aufgetreten ist. Beim Betrachten der Daten, auch ganzheitlich, kann der Analyst einfach nicht wissen, dass 3 Kombinationen einer Struktur existieren. Bei der Normalisierung wird er 2 Fälle berücksichtigen, während der dritte zu einer tickenden Zeitbombe wird, die wie immer im unerwartesten Moment explodiert.

Anders gesagt, wenn wir mehrere unabhängige Module im System haben. Jedes mit seiner eigenen Datenbank oder zumindest seinen eigenen Tabellen in der Datenbank. Was zusammen 200-300 Tabellen ergibt, ist die Erwartung, dass der Analyst das ohne Probleme bewältigt, keine Fehler macht und Berichte nicht von den Erwartungen abweichen, gelinde gesagt naiv.

Trotz allem ist das Aufstellen einer Kopie/Replikat-Datenbank für Analysten und die Vergabe eines 4-buchstabigen Namens, der vom Wort "Analytics" stammt, immer noch weit verbreitet.

BI-Tools überbieten sich darin, wer die bessere Benutzeroberfläche erstellt, mit der sich Berichte anklicken lassen. Sie versprechen, dass wir Daten ohne SQL analysieren können.

Ja, das kann funktionieren, an vielen Stellen funktioniert es genau so. Worüber wir jedoch nicht laut sprechen:

Was sich letztendlich auf die Datenqualität und die Effektivität der Geschäftsentscheidungen auswirkt.

Was bleibt uns übrig?

Vielleicht sollten wir zuerst festlegen, welche Probleme wir in erster Linie lösen wollen:

Strategie und Analyse
  1. Datenanalyse/Berichtserstellung darf keinen Einfluss auf die Systemfunktion haben.
  2. Daten in Berichten müssen immer aktuell sein (Datenverzögerung ist akzeptabel, individuell festgelegt)
  3. Berichte müssen den realen, unverzerrten Systemzustand widerspiegeln
  4. Die Datenstruktur muss regressionsresistent sein
  5. Einheitliche Datenaufbewahrungs- und Archivierungsrichtlinie

1) Ressourcentrennung

Das ist nichts Revolutionäres, wenn wir nicht wollen, dass unser System durch Überlastung der Datenbank durch Berichtserstellung gefährdet wird, müssen wir eine separate Datenbank aufsetzen.

Welche Datenbank für Analytics wählen?

Das ist im Grunde ein Thema für einen separaten Artikel oder sogar eine Artikelserie. Es gibt sehr viele Lösungen, einige bessere, andere schlechtere. Es gibt keine einzige magische Lösung für alle Probleme.

Mein Rat, besonders für kleinere Teams ohne Erfahrung im Datenmanagement, ist, sich nicht auf Technologien zu stürzen, mit denen wir keine Erfahrung haben.

Entscheidend ist das richtige Datenformat. Nach der Umwandlung vieler schmaler Tabellen in eine breite wird sich wahrscheinlich herausstellen, dass die Generierung desselben Berichts ohne 20x JOIN nicht mehr 10 Minuten sondern weniger als eine halbe Sekunde dauert.

Und wenn das Problem Aggregationen sind, nicht Verknüpfungen?

Dann ist es besser, anstatt im laufenden Betrieb zu aggregieren, eine Tabelle vorzubereiten, die diese Daten in aggregierter und nicht in roher Form enthält.

2) Aktuelle Daten

Nun gut, aber wenn wir eine neue, unabhängige Datenbank erstellen, wie sorgen wir dafür, dass die Daten in dieser Datenbank aktuell und frisch sind?

Hier hängt sehr viel von der akzeptablen Verzögerung bei der Datensynchronisation ab. Meist reicht es, wenn die Analysedatenbank etwa 24 Stunden hinter der Transaktionsdatenbank liegt. Das heißt, sie enthält Daten bis "gestern", einschließlich des ganzen "gestrigen Tages".

Warum? Weil nur wenige Geschäftsentscheidungen sofort getroffen werden. Wenn Entscheidungen in so kurzer Zeit getroffen werden müssen, dann baut man entsprechende Automatisierungen.

Wenn eine 24-stündige Verzögerung akzeptabel ist (manchmal ist sie es nicht und dafür gibt es auch Lösungen), reicht es, wenn wir die Synchronisation mehrmals täglich durchführen. Natürlich gibt es auch hier keine goldene Regel. Genauso wie es keine Regel gibt, die besagt, welchen Bereich man auf einmal synchronisieren soll.

Es gibt jedoch eine gute Praxis, die die Synchronisation erleichtert. Sie besteht darin, sicherzustellen, dass die Haupttabellen im Transaktionssystem das Datum der Erstellung/Änderung des Datensatzes enthalten.

Mit diesen beiden Informationen können wir das Synchronisationsfenster auf einen bestimmten Zeitraum eingrenzen.

Wie sieht das in der Praxis aus? Wir können z.B. alle 6 Stunden einen Synchronisationsprozess starten und nur Datensätze sammeln, die in den letzten 24 Stunden geändert wurden.
Das sind natürlich Beispielzahlen, diese Werte müssen basierend auf der Größe und dem Verhalten der Daten festgelegt werden.

Warum aus 24 Stunden? Als zusätzliche Sicherheit. Wir könnten Daten nur aus 7 Stunden holen, aber wenn aus irgendeinem Grund die Synchronisation nicht ausgeführt wird und wir das nicht bemerken, könnten wir Daten verlieren.

3) Systemzustand widerspiegeln

Meine Meinung zu diesem Thema mag kontrovers erscheinen, aber ich glaube, dass das beste Wissen über Daten und Systemverhalten das Team hat, das dieses System/Modul baut.

Genau dieses Team sollte dafür verantwortlich sein, dass Daten, die vom System oder seinem Teil generiert werden, für den das gegebene Team verantwortlich ist, in das zentrale Datenrepository gelangen.

Mit anderen Worten, genau das Team, das eine bestimmte Funktionalität implementiert, sollte basierend auf zuvor gesammelten Anforderungen diese Daten in das entsprechende Format umwandeln und weiterleiten.

Dies ist wahrscheinlich der einfachste Weg sicherzustellen, dass die Daten vollständig sind und Programmierer aus dem jeweiligen Team sich bewusst sind, dass diese Daten irgendwo verwendet werden. Das analytische Datenformat wird für sie zu einer Art Vertrag – einem Vertrag, den sie einhalten müssen.

Das unterscheidet sich nicht sehr vom API-Schema-Vertrag.

4) Regressionsresistenz

Dieser Punkt ist wahrscheinlich der komplizierteste. Die korrekte Implementierung der Datenschema-Evolution ist oft nicht so sehr schwierig als vielmehr mühsam.

In Kurzform sehen die Regeln so aus:

Können wir also nichts löschen?

Können wir, aber nicht einfach so. Generell hängt es nur von uns ab, wie und wie oft wir die Rückwärtskompatibilität brechen.

Wenn wir unsere analytische Datenquelle nur intern nutzen und, sagen wir, der Analyst, der Berichte erstellt, auf dem Laufenden mit Systemänderungen ist, könnten wir bei entsprechender Koordination neue Tabellen hinzufügen und dann alte entfernen, ihm etwas Zeit geben, die Berichte zu aktualisieren.

Wenn jedoch unsere analytische Datenquelle für Data Science verwendet wird, aber wir in einer Multi-Tenancy-Umgebung arbeiten und analytische Daten/Berichte Kunden zur Verfügung gestellt werden, dann müssen wir völlig anders an die Sache herangehen.

Datenaufbewahrungs- und Archivierungsrichtlinie

Wie ich oben erwähnt habe, ist es sehr wichtig, dass Daten in der Analysedatenbank, insbesondere die von verschiedenen Modulen gelieferten, denselben Regeln bezüglich der Aufbewahrungszeit unterliegen.

Wenn wir Lagerzustände im System nur aus dem letzten Jahr speichern, aber Bestellungen aus den letzten 5 Jahren, können Analysten keinen Bericht erstellen, der Daten aus beiden Quellen enthält.

Das ist eher ein formales als ein technisches Problem. Es scheint, dass es ausreichen würde, sich einfach zu einigen, aber in der Praxis ist es nicht so einfach.

Um eine gemeinsame Datenaufbewahrungs- und Archivierungsrichtlinie festzulegen, müssen nicht nur technische, sondern auch rechtliche, geschäftliche oder analytische Aspekte berücksichtigt werden, was Kompromisse erfordern kann.

Beispiele

Schauen wir uns nun ein einfaches Beispiel eines ETL-Prozesses an, dessen Aufgabe es ist, Daten von der Transaktionsdatenbank zur Analysedatenbank zu übertragen.

In diesem Beispiel verwende ich Flow PHP, das ist jedoch nichts speziell Einzigartiges für PHP. In jeder Sprache können wir etwas sehr Ähnliches mit jeder Bibliothek erstellen, die die Erstellung von CLI-Anwendungen und einem Tool zur Datenverarbeitung erleichtert.

Das folgende Beispiel (in etwas veränderter Form) stammt aus einer Live-Stream-Session, die ich das Vergnügen hatte, mit Roland aufzunehmen, der den Kanal Never Code Alone betreibt. Das Videomaterial finden Sie auf YouTube unter dem Begriff "Flow PHP"

Nehmen wir an, dass das Bestellformat etwa so aussieht:

schema
|-- order_id: ?uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- address: map<string, string>
|-- notes: list<string>
|-- items: list<structure{sku: string, quantity: integer, price: float}>

Unser Ziel ist es, diese Bestellungen in die Analysedatenbank zu übertragen, also bereiten wir das Schema der Eingabedaten sowie der Zieldaten vor.

<?php

declare(strict_types=1);

namespace App\DataFrames;

use Flow\ETL\Adapter\Doctrine\DbalMetadata;
use function Flow\ETL\DSL\integer_schema;
use function Flow\ETL\DSL\uuid_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\string_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\Types\DSL\type_map;
use function Flow\Types\DSL\type_string;
use function Flow\ETL\DSL\list_schema;
use function Flow\Types\DSL\type_list;
use function Flow\Types\DSL\type_structure;
use function Flow\Types\DSL\type_integer;
use function Flow\Types\DSL\type_float;
use function \Flow\ETL\DSL\schema;
use Flow\ETL\Schema;

final class Orders
{
    public static function sourceSchema() : Schema
    {
        return schema(
            uuid_schema("order_id"),
            uuid_schema("seller_id"),
            datetime_schema("created_at"),
            datetime_schema("updated_at", nullable: true),
            datetime_schema("cancelled_at", nullable: true),
            float_schema("discount", nullable: true),
            string_schema("email"),
            string_schema("customer"),
            map_schema("address", type: type_map(key_type: type_string(), value_type: type_string())),
            list_schema("notes", type: type_list(element: type_string())),
            list_schema("items", type: type_list(element: type_structure(elements: ["item_id" => type_string(), "sku" => type_string(), "quantity" => type_integer(), "price" => type_float()]))),
        );
    }

    public static function destinationSchema() : Schema
    {
        return self::sourceSchema()
            ->replace('updated_at', datetime_schema("updated_at"))
            ->remove('address')
            ->add(
                string_schema('street', metadata: DbalMetadata::length(2048)),
                string_schema('city', metadata: DbalMetadata::length(512)),
                string_schema('zip', metadata: DbalMetadata::length(32)),
                string_schema('country', metadata: DbalMetadata::length(128)),
            )
            ->remove('items')
            ->add(
                uuid_schema('item_id', metadata: DbalMetadata::primaryKey()),
                string_schema('sku', metadata: DbalMetadata::length(64)),
                integer_schema('quantity'),
                integer_schema('price'),
                string_schema('currency', metadata: DbalMetadata::length(3)),
            )
            ;
    }
}

Beachten Sie, dass die Zielstruktur der Tabelle nicht mehr auf Bestellungen ausgerichtet ist, sondern auf bestellte Artikel. Unser Ziel ist es, die Bestellartikel so zu entpacken, dass jeder eine separate Zeile ist.

Dadurch muss der Analyst, der einen Bericht generieren muss, nicht mehr kombinieren und JSON im laufenden Betrieb entpacken.

Die Adressspalte wurde auch in mehrere Spalten aufgeteilt, wodurch der Bericht einfacher gefiltert werden kann.

Eine weitere wichtige Transformation ist die Umwandlung von price von float in int durch Multiplikation des Gleitkommawerts mit 100.

Die letzte Änderung wird das Hinzufügen von Informationen über die Währung der Preise sein. Aber woher kommt diese Information? Das ist ein sehr wichtiges Detail, das aus einer nicht sehr guten Implementierung resultiert. In diesem speziellen Fall sind alle Bestellungen in Dollar. Das System weiß das, die Programmierer wissen das, aber eine Person, die die Tabellen in der Datenbank ohne Kontext betrachtet, muss dieses Wissen nicht unbedingt haben.

Unsere Zielstruktur sollte etwa so aussehen:

schema
|-- order_id: uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- notes: list<string>
|-- street: string
|-- city: string
|-- zip: string
|-- country: string
|-- item_id: uuid
|-- sku: string
|-- quantity: integer
|-- price: integer
|-- currency: string

Der nächste Schritt ist die Erstellung der entsprechenden Tabelle in der Analysedatenbank. Das können wir relativ einfach mit dem Adapter für Doctrine DBAL erreichen.

<?php

declare(strict_types=1);

namespace App\Dbal;

use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\Provider\SchemaProvider as MigrationsSchemaProvider;
use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;

final class SchemaProvider implements MigrationsSchemaProvider
{
    public const ANALYTICAL_ORDER_LINE_ITEMS = 'order_line_items';

    public function createSchema(): Schema
    {
        return new Schema(
            tables: [
                to_dbal_schema_table(Orders::destinationSchema(), self::ANALYTICAL_ORDER_LINE_ITEMS),
            ]
        );
    }
}

In der Analysedatenbank werden wir also eine "vereinfachte" oder "normalisierte" Version der Bestelltabelle speichern. Die Normalisierung besteht im Entpacken der Bestellartikel und ihrer Umwandlung in separate Zeilen sowie der Aufteilung der "Adress"-Spalte in mehrere Spalten.

Schauen wir uns also den CLI-Befehl an, der für die Übertragung von Daten von der Transaktions- zur Analysedatenbank verantwortlich ist.

<?php

namespace App\Command;

use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use App\Dbal\SchemaProvider;
use Doctrine\DBAL\Connection;
use Flow\Doctrine\Bulk\Dialect\SqliteInsertOptions;
use Flow\ETL\Rows;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function Flow\ETL\Adapter\Doctrine\from_dbal_key_set_qb;
use function Flow\ETL\Adapter\Doctrine\pagination_key_desc;
use function Flow\ETL\Adapter\Doctrine\pagination_key_set;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\analyze;
use function Flow\ETL\DSL\constraint_unique;
use function Flow\ETL\DSL\data_frame;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\rename_replace;
use function Flow\ETL\DSL\schema_to_ascii;
use function Flow\Types\DSL\type_datetime;

#[AsCommand(
    name: 'app:orders:import',
    description: 'Import orders from the transactional database to the analytical database.',
)]
class OrdersImportCommand extends Command
{
    public function __construct(
        private readonly Connection $transactional,
        private readonly Connection $analytical,
    )
    {
        parent::__construct();
    }

    protected function configure()
    {
        $this->addOption('start-date', null, InputOption::VALUE_REQUIRED, 'Start date for the data pull.', '-24 hours')
            ->addOption('end-date', null, InputOption::VALUE_REQUIRED, 'End date for the data pull.', 'now')
        ;
    }


    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $io->title('Importing orders');

        $startDate = type_datetime()->cast($input->getOption('start-date'));
        $endDate = type_datetime()->cast($input->getOption('end-date'));

        $io->progressStart();

        $report = data_frame()
            ->read(
                from_dbal_key_set_qb(
                    $this->transactional,
                    $this->transactional->createQueryBuilder()
                        ->select('*')
                        ->from(SchemaProvider::ORDERS)
                        ->where('updated_at BETWEEN :start_date AND :end_date')
                        ->setParameter('start_date', $startDate->format('Y-m-d H:i:s'))
                        ->setParameter('end_date', $endDate->format('Y-m-d H:i:s')),
                    pagination_key_set(
                        pagination_key_desc('updated_at'),
                        pagination_key_desc('order_id')
                    )
                )->withSchema(Orders::sourceSchema())
            )
            ->withEntry('_address', ref('address')->unpack())
            ->renameEach(rename_replace('_address.', ''))
            ->withEntry('_item', ref('items')->expand())
            ->withEntry('_item', ref('_item')->unpack())
            ->renameEach(rename_replace('_item.', ''))
            ->drop('_item', 'items', 'address')
            ->withEntry('currency', lit('USD'))
            ->withEntry('price', ref('price')->multiply(100))
            ->constrain(constraint_unique('item_id'))
            ->match(Orders::destinationSchema())
            ->write(
                to_dbal_table_insert(
                    $this->analytical,
                    SchemaProvider::ORDER_LINE_ITEMS,
                    SqliteInsertOptions::fromArray([
                        'conflict_columns' => ['item_id'],
                    ])
                )
            )
            ->run(function (Rows $rows) use ($io) {
                $io->progressAdvance($rows->count());
            }, analyze: analyze())
        ;

        $io->progressFinish();

        $io->newLine();

        $io->definitionList(
            'Orders Import Summary',
            new TableSeparator(),
            ['Execution time ' => \number_format($report->statistics()->executionTime->highResolutionTime->seconds) . ' seconds'],
            ['Memory usage ' => \number_format($report->statistics()->memory->max()->inMb()) . ' MB'],
            ['Rows inserted ' => \number_format($report->statistics()->totalRows())],
        );

        return Command::SUCCESS;
    }
}
Natürlich ist das nicht die schönste oder auch nur korrekteste Form. Normalerweise würde ein CLI-Befehl nicht die Definition der ETL-Pipeline enthalten, aber für die Zwecke des Beispiels ist das ein guter Start.

Ein dediziertes zentrales Data Warehouse ist zweifellos eine verlockende Option, besonders an Orten, wo mangelnde Sichtbarkeit eine effiziente Entscheidungsfindung verhindert.

Glücklicherweise ist das die Art von Funktionalität, die im Grunde in jedem Stadium des Projektlebens hinzugefügt werden kann.

Es mag die Einführung zusätzlicher Prozesse und eine gewisse Disziplin von den Teams erfordern, aber die Vorteile einer solchen Lösung sind enorm.

Natürlich können solche Änderungen, wie alles Neue, schwer einzuführen erscheinen. Mangelnde Erfahrung in der Datenarbeit zumindest in den Anfangsphasen lässt diese Aufgabe geradezu unausführbar erscheinen.

Aus meiner Erfahrung geht jedoch hervor, dass es am schwierigsten ist anzufangen, wenn wir bereits haben:

Die Arbeit läuft praktisch maschinell.

Es ist jedoch wichtig zu bedenken, dass es keine universelle Lösung gibt, die für jedes System passt. In jedem Fall muss der Ansatz an die Spezifika des jeweiligen Systems und der Organisation angepasst werden.

Wie anfangen?

Wenn Sie Hilfe beim Aufbau eines zentralen Data Warehouse benötigen, helfe ich Ihnen gerne.
Kontaktieren Sie mich, und wir erstellen gemeinsam eine Lösung, die perfekt auf Ihre Bedürfnisse zugeschnitten ist.

Ich ermutige Sie auch, den Discord - Flow PHP Server zu besuchen, wo wir direkt sprechen können.

Beratung