
Analiza danych w rozproszonych systemach transakcyjnych
W tym wpisie postaram się poruszyć problem analizy danych w rozproszonych systemach transakcyjnych.
Jeżeli szukasz pomysłów na zbudowanie centralnego magazynu danych, który pozwoli zebrać dane z całego systemu,
niezależnie od jego fragmentacji a przy okazji nie utonąć w kosztach operacyjnych, to ten wpis jest dla Ciebie.
Wszystko zaczyna się niewinnie
Większość systemów, które na co dzień tworzymy, przechowuje dane w jakiejś relacyjnej bazie danych. Bardzo popularnym, a przy okazji dobrym wyborem jest PostgreSQL, który w ostatnich latach stał się niemalże standardem w branży.
Historia większości projektów wygląda przeważnie bardzo podobnie: zaczynamy od weryfikacji pomysłu, zdobywamy pierwszych użytkowników, system zaczyna zarabiać, biznes kombinuje jak zwiększyć zyski, powstają nowe funkcjonalności. Każda nowa funkcjonalność to kilka nowych tabelek w bazie danych.
W celu przyśpieszenia developmentu korzystamy z ORM-a, automatycznie generujemy migracje, które tworzą i aktualizują schemat bazy danych.
Początkowo wszystko idzie gładko, nowe funkcjonalności przynoszą spodziewane zyski, biznes zaczyna się skalować. Zatrudniamy więcej programistów, aby tworzyć więcej funkcjonalności równolegle.
Od czasu do czasu ktoś zgłasza, że system w niektórych miejscach zaczyna "zamulać", szybki rekonesans, jeszcze szybsza diagnoza, brakuje indeksu w jakiejś tabelce.
W konfiguracji mapowań ORM-a dokładamy indeks na pole, po którym system bardzo często wyszukuje dane. Problem rozwiązany.
Powiększający się zespół programistów przykłada dużą wagę do jakości, być może nawet posługuje się
zaawansowanymi technikami wytwarzania oprogramowania, jak Event Storming czy Domain-Driven Design.
CI/CD wykonuje niezliczone ilości testów, upewniając się, że zmiany nie wprowadzają regresji.
Idylla trwa, zespół lub być może wiele zespołów zaczyna dokładać nowe moduły do systemu. Moduły odpowiednio odizolowane, odpowiedzialne za konkretne zadania, nigdy nie przekraczające swoich granic i nie wchodzące w kompetencje innych modułów.
Do komunikacji wykorzystywane są oczywiście kolejki, implementujemy Outbox/Inbox Pattern
Aby zapewnić odpowiednią izolację, ustalamy reguły mówiące o tym, że każdy moduł ma dostęp jedynie do tych tabelek w bazie danych, które do niego należą. W celu uzyskania danych z innego modułu należy udać się do tego modułu, czy to za pośrednictwem jakiegoś wewnętrznego API, czy w jakikolwiek inny sposób.
Co jakiś czas biznes przychodzi do nas z pytaniem czy możecie na szybko wygenerować dla nas ten raport?. Oczywiście, kilka linijek w SQL'u, być może kilkadziesiąt i raport gotowy.
Biznes zadowolony, raport w postaci CSV leci do Excela (najbardziej popularnego narzędzia BI), biznes wyciąga wnioski, planuje nowe funkcjonalności i zmiany.

Czas płynie, nowe tabelki wyrastają jak grzyby po deszczu
W takim stanie rzeczy możemy trwać bardzo długo, nawet kilka dobrych lat.
W międzyczasie ktoś gdzieś na pewno wpadnie na pomysł, żeby dodać do systemu możliwość generowania raportów. To tylko i wyłącznie kwestia czasu.
Raporty o stanie systemu to dla biznesu jedno z bardziej kluczowych narzędzi dających wgląd w zachowania, preferencje czy trendy użytkowników. Pozwalają nie tylko zrozumieć, co się dzieje, ale też odpowiednio zaplanować to, co się ma dopiero wydarzyć.
Im lepsze i bardziej szczegółowe raporty, tym lepsze decyzje można na ich podstawie podejmować. Dobre decyzje biznesowe przekładają się na większe zyski, większe zyski przekładają się na większy budżet. Większy budżet przekłada się na lepsze narzędzia, większe zespoły, lepsze wynagrodzenia czy premie.
W interesie każdego programisty powinno być więc dostarczanie biznesowi możliwie jak najlepszych i jak najbardziej precyzyjnych danych, w końcu lepsze wyniki przekładają się bezpośrednio na lepsze zyski.
Pierwsze objawy
System działa, przynosi zyski. Składa się z około 5, może nawet 10 modułów, każdy moduł składa się z 20-50 tabelek w bazie danych. Każdy moduł dostarcza swoje własne raporty.
- Sprzedaż
- Marketing
- Logistyka
- Stany Magazynowe
- Użytkownicy
Każdy moduł udostępnia tylko część danych, cząstkę większego obrazu, żaden jednak nie daje podglądu na całość.
Zespoły wprawdzie zaimplementowały klucze referencyjne do danych pochodzących z innych modułów, udało się nawet w interfejsie użytkownika stworzyć jedno miejsce z którego można wygenerować raporty.
To jednak dalej za mało...
Bardzo szybko okazuje się, że raporty generowane w różnych modułach, napisane przez różnych programistów, być może nawet w różnych technologiach mają różne formaty danych, różne standardy nazewnictwa.
Inaczej interpretowane są zakresy dat, jeden moduł uwzględnia daty końca i początku, drugi je wyklucza, a jeszcze inny robi przedział prawostronnie otwarty w celu ułatwienia paginacji, bo akurat mają też API i to API wykorzystuje ten sam kawałek kodu.
Ponieważ każdy moduł jest niezależny, posiada swoje granice, swóją nomenklaturę, w pewnym momencie orientujemy się, że to, co w jednym module nazywamy w jakiś sposób, inny moduł eksponuje pod zupełnie inną nazwą. Ponieważ w kontekście tego modułu ma to sens.
Po czasie pewnie też się zorientujemy, że każdy zespół inaczej zdefiniował sobie politykę retencji i przechowywania danych. Pomimo posiadania w kluczowym module danych z ostatnich 5 lat, nie możemy nic z nimi zrobić, bo moduły, które dostarczały dane potrzebne do wzbogacenia podstawowego raportu, posiadają dane jedynie z ostatnich 2 lat.
Nie są to jednak problemy, których odrobina magii w Excelu nie byłaby w stanie rozwiązać (być może poza brakami w danych). Tym kolumnom zmienimy nazwy, te usuniemy, dodamy szybkie filtrowanko i wystarczy.
Stworzymy sobie jeden wielki plik, w którym będziemy mieli jeden arkusz o nazwie "Dashboard", a wszystkie inne będą tylko do odczytu, będą zasilały dashboard.
Być może to podejście będzie nawet chwilę działać. Być może nawet dłużej niż chwilę, ale nie miejmy złudzeń. To wszystko w końcu padnie, i to zgodnie z prawami Murphiego padnie w najgorszym możliwym momencie.
Co złego jest w Excelu?
Nic! Excel to rewelacyjne narzędzie. Problem nie leży w Excelu, ale w jego wykorzystaniu.
Ta cała magia polegająca na oczyszczeniu i przygotowaniu danych nie powinna mieć miejsca w Excelu, nie na większą skalę. Jeżeli mówimy o jednorazowym szybkim raporcie, nie ma problemu. Robimy co musimy, klepiemy formułki, analizujemy dane i zapominamy.
Jeżeli jednak ma to być część naszej codziennej rutyny, jeżeli cyklicznie musimy przechodzić przez ten sam proces, podążając za nieustannymi zmianami i ewolucją systemu, prędzej czy później okaże się, że te arkusze są nieaktualne.
Kolumny przestały istnieć lub zmieniły nazwy, powstały nowe kolumny, format danych uległ zmianie albo co gorsza, jeden z zespołów opiekujący się jednym z modułów usunął jakieś dane bez świadomości, że były one wykorzystywane przez jakiegoś użytkownika biznesowego gdzieś w jednym z jego raportów, które otwiera raz na kwartał.
Na dłuższą metę, bardziej złożone arkusze kalkulacyjne, które czerpią dane z automatycznie generowanych przez system raportów, które są następnie sklejane na podstawie niejawnych reguł, są nie do utrzymania.
To może podepniemy jakieś narzędzie BI?
Pomyślało wielu programistów, którzy wielokrotnie zetknęli się z problemem generowania raportów.
Weźmy na przykład taki Metabase. Darmowe narzędzie, które możemy postawić w kilka minut za pomocą Dockera.
Dać mu dostęp do naszej bazy i kilku lub wszystkich tabelek, i za pomocą bardzo przyjaznego interfejsu użytkownika biznes będzie mógł w bardzo łatwy i przyjemny sposób wygenerować najbardziej skomplikowane raporty.
Raporty, które będą mogły zawierać dane z wielu modułów jednocześnie!
Możemy nawet zatrudnić analityka danych z podstawami SQL'a, który wszystko to, czego nie będzie się dało wyklikać, osiągnie za pomocą odpowiednio przygotowanego zapytania.
Tylko, że to nie rozwiązuje problemu
Jedynie odsuwa go w czasie.
Jeżeli przyjrzymy się dokładnie, co się zmieniło, to zmieniła się tylko jedna rzecz. Narzędzie... Przenieśliśmy problem oczyszczania i łączenia danych z Excela do Metabase.
Excel wprawdzie wrócił do swojej pierwotnej roli, możemy teraz raporty pobrane z Metabase wrzucić do Excela.
Jednak nasza niejawna logika łączenia/oczyszczania danych przeniosła się z arkusza kalkulacyjnego do zapytań SQL.
Poza tym, wszystkie problemy zostały takie same:
- niespójność danych
- niespójność nazewnictwa
- brak jednolitej polityki wstecznej kompatybilności
- brak jednolitej polityki retencji danych
To może ustanowimy procesy i reguły?
Większość z powyższych problemów da się rozwiązać implementując odpowiednie procesy oraz reguły.
Możemy ustalić standardy nazewnictwa mówiące, że każda tabelka w bazie musi zawierać w nazwie prefix modułu, a kolumny nazywane są małymi literami i odseparowane podkreśleniami.
Możemy ustalić, że każdy moduł przechowuje dane z ostatnich 5 lat (hot storage), wszystko co starsze jest archiwizowane. (cold storage)
Możemy ustalić, że zakresy dat zawsze traktowane są jako przedziały prawostronnie otwarte.
Możemy ustalić, że nie usuwamy żadnych kolumn z bazy danych, albo że przed usunięciem czegokolwiek najpierw wchodzimy w okres przejściowy, podczas którego pokazujemy każdemu użytkownikowi systemu, które kolumny zmienią się i w jaki sposób.
Nawet jeżeli przyjmiemy na potrzeby dyskusji, że uda się te procesy globalnie zaimplementować pomiędzy kilkoma zespołami, oraz że te zespoły będą ich bezwzględnie i bardzo dokładnie przestrzegać, to nie wystarczy...
Skalowanie bazy danych nie jest tanie
Szczególnie jeżeli opieramy się o rozwiązania chmurowe.
Wyobraźmy sobie sytuację, w której w szczytowych godzinach pracy systemu (kiedy użytkownicy generują najwięcej transakcji) analityk biznesowy, który pracuje według własnego planu musi wygenerować raport w oparciu o taki typowy, SQLowy kilkutysięcznik?
Analityk odpala zapytanie, baza danych zaczyna mielić. Zapytanie trwa, 5, 10, 15 minut. Baza danych zaczyna się pocić.
Użytkownicy bombardują system nowymi zamówieniami (lub jakimikolwiek innymi operacjami, które generują sporo zapisów) w czasie kiedy analityk czeka na wyniki.
W tym samym momencie ktoś z biznesu potrzebuje na szybko sprawdzić kilka raportów, każdy z nich zawiera "całkowitą liczbę wierszy w tabelce". Takich osób jest kilka.
Wszystkie te operacje nakładają się na siebie, nasza już i tak bardzo obciążona baza danych nie wyrabia.
Niektóre transakcje użytkowników nie dochodzą do skutku.
System ledwo dyszy. Czas oczekiwania na najbardziej podstawowe operacje mierzony jest w sekundach.
A teraz wisienka na torcie, kiedy te wszystkie dantejskie sceny mają miejsce, kiedy Pager Duty jest rozgrzany do czerwoności od wszelkiego typu i rodzaju incydentów, kiedy zespoły w panice próbują przywrócić system do życia, devopsi kombinują jak na szybko przeskalować bazę danych...

CEO zaczyna prezentację dla potencjalnego partnera biznesowego, z którym współpraca ma okazać się kluczowa w strategii rozwoju firmy...
To może po prostu postawmy replikę?
W końcu raporty nie będą przeciążać naszej bazy transakcyjnej.
Podwoimy wprawdzie koszty utrzymania bazy danych, ale zredukujemy ryzyko przeciążenia systemu i będziemy mogli podpiąć ulubione narzędzie business intelligence wprost na replikę, co da nam dane real time.
Brzmi rewelacyjnie, ale w praktyce nie jest to tak proste.
Pomijając już nawet potencjalne problemy wynikające z samej natury replikacji, głównym i podstawowym problemem na który najczęściej trafiam, jest percepcja.
Zupełnie inaczej na tabele w bazie danych będzie patrzył programista, który te tabele wygenerował za pomocą mapowań ORM-a, niż analityk danych.
Programista będzie wiedział, które tabele należy połączyć razem, aby otrzymać obraz całości. Będzie rozumiał ograniczenia i warunki zaszyte gdzieś w kodzie aplikacji. Przede wszystkim programista zna lub chociaż powinien się orientować jak wygląda cykl życia systemu (jego danych).
Ta cała wiedza najczęściej nie jest dostępna dla analityków.
To tak jakby powiedzieć komuś, żeby spojrzał na coś przez dziurkę od klucza. Coś na pewno da się zobaczyć. Jakieś wnioski da się wyciągnąć, ale bardzo ciężko będzie odbudować całość.
Wystarczy, że mamy w bazie danych kolumnę typu JSONB w której przechowujemy jakieś struktury danych. Załóżmy, że system dopuszcza 3 poprawne kombinacje tej samej struktury, ale jedna jest super rzadka, na tyle rzadka, że jeszcze w systemie nie wystąpiła. Patrząc na dane, nawet całościowo, analityk po prostu nie może wiedzieć, że istnieją 3 kombinacje jednej struktury. Podczas normalizacji uwzględni 2 przypadki, podczas gdy trzeci stanie się tykającą bombą zegarową, która wybuchnie jak zawsze w najmniej oczekiwanym momencie.
Inaczej mówiąc, jeżeli mamy w systemie kilka niezależnych modułów. Każdy z swoją bazą danych, albo przynajmniej swoimi tabelami w bazie. Co sumarycznie daje nam 200-300 tabelek, oczekiwanie, że analityk to bez problemu ogarnie, nie popełni błędów i raporty nie będą odbiegały od oczekiwań, jest delikatnie mówiąc naiwne.
Mimo wszystko wystawienie kopii/repliki bazy danych dla analityków i nadanie jej 4-literowej nazwy pochodzącej od słowa "analytics" dalej jest powszechnie stosowane.
Narzędzia BI prześcigają się w tym, kto stworzy lepszy interfejs użytkownika, dzięki któremu raporty da się wyklikać. Obiecują, że będziemy mogli analizować dane bez SQL.
Tak, to może działać, w wielu miejscach właśnie tak to działa. O czym jednak głośno nie mówimy to:
- Problemy z wsteczną kompatybilnością oraz zmianami struktury danych
- Problemy z odpowiednim utrzymaniem / wersjonowaniem / testowaniem gigantycznych zapytań SQL/skryptów normalizujących dane w locie
- Repliki/Kopie generują dodatkowe koszty
- Redukcja zasobów replik jest albo niemożliwa, albo uniemożliwia generowanie raportów w akceptowalnych czasach
Co w rezultacie odbija się na jakości danych i skuteczności podejmowania decyzji biznesowych.
Co nam pozostaje?
Może najpierw ustalmy jakie problemy w pierwszej kolejności chcemy rozwiązać:

- Analizowanie danych / generowanie raportów nie może mieć żadnego wpływu na pracę systemu.
- Dane w raportach muszą być zawsze świeże (dopuszczalny jest opóźnienie w danych, ustalane indywidualnie)
- Raporty muszą odzwierciedlać realny, niewypaczony stan systemu
- Struktura danych musi być odporna na regresję
- Spójna polityka retencji i archiwizowania danych
1) Separacja Zasobów
Nie jest to nic odkrywczego, jeżeli nie chcemy, żeby nasz system był narażony na przeciążenia wynikające z nadużywania bazy danych poprzez generowanie raportów, musimy postawić sobie osobną bazę danych.
Jaką bazę wybrać pod analitykę?
To jest w zasadzie temat na osobny artykuł albo nawet serię artykułów. Rozwiązań jest bardzo dużo, jedne lepsze, inne gorsze. Nie istnieje jedno magiczne rozwiązanie na wszystkie problemy.
Moja rada, szczególnie dla mniejszych zespołów, niedoświadczonych w zarządzaniu danymi jest taka, żeby nie rzucać się na technologie, z którą nie mamy doświadczenia.
Kluczowy jest odpowiedni format danych. Po zamianie wielu wąskich tabelek na jedną szeroką najprawdopodobniej
okaże się, że generowanie tego samego raportu tylko bez używania 20x JOIN
nie zajmuje już 10 minut
a mniej niż pół sekundy.
A co jeżeli problemem są agregacje, a nie łączenia?
Wtedy, zamiast agregować w locie, lepiej przygotować tabelę zawierającą te dane w formie zagregowanej, a nie surowej.
2) Świeże Dane
No dobra, ale skoro tworzymy nową, niezależną bazę danych, to w jaki sposób zadbamy o to aby dane w tej bazie były świeże i aktualne?
Tutaj bardzo dużo zależy od dopuszczalnego opóźnienia w synchronizacji danych. Najczęściej wystarczy jak baza analityczna jest około 24 godziny za bazą transakcyjną. Czyli zawiera dane do "wczoraj", uwzględniając całe "wczoraj".
Dlaczego? Bo mało które decyzje biznesowe podejmowane są w danej chwili. Jeżeli jakieś decyzje muszą być podejmowane w tak krótkim czasie, wtedy buduje się właściwe automatyzacje.
Jeżeli 24-godzinne opóźnienie jest akceptowalne (czasami nie jest i na to też są sposoby), wystarczy, że synchronizacje przeprowadzimy kilka razy dziennie. Oczywiście tu też nie ma złotej reguły. Tak samo jak nie ma reguły mówiącej jak duży zakres synchronizować na raz.
Jest za to jedna dobra praktyka, która ułatwia synchronizację. Polega ona na upewnieniu się, że główne tabele w systemie transakcyjnym zawierają datę utworzenia/modyfikacji rekordu.
Posiadając te dwie informacje jesteśmy w stanie zawęzić okno synchronizacji do jakiegoś określonego okresu czasu.
Jak to w praktyce wygląda? Możemy np. odpalać proces synchronizacji co 6 godzin, zbierając tylko rekordy zmienione w
ciągu ostatnich 24 godzin.
Oczywiście to są przykładowe liczby, te wartości trzeba ustalić na podstawie rozmiaru i zachowania danych.
Dlaczego z 24 godzin? Takie dodatkowe zabezpieczenie. Moglibyśmy pobierać dane tylko z 7 godzin, ale jeżeli z jakiegokolwiek powodu synchronizacja się nie wykona, a my tego nie wyłapiemy, możemy stracić dane.
3) Odzwierciedlenie Stanu Systemu
Moja opinia na ten temat może wydać się kontrowersyjna, ale uważam, że najlepszą wiedzę na temat danych i zachowania systemu czy modułu ma zespół, który ten system/moduł buduje.
To właśnie ten zespół powinien być odpowiedzialny za to, żeby dane, które są generowane przez system lub jego część za którą dany zespół odpowiada, trafiały do centralnego repozytorium danych.
Innymi słowy, to właśnie zespół implementujący daną funkcjonalność powinien na podstawie zebranych wcześniej wymagań przekształcić te dane do odpowiedniego formatu i wypchnąć dalej.
Jest to chyba najłatwiejszy sposób upewnienia się, że dane są kompletne a programiści z danego zespołu są świadomi, że te dane są gdzieś wykorzystywane. Format danych analitycznych staje się dla nich swego rodzaju kontraktem – kontraktem, którego muszą przestrzegać.
Nie różni się to bardzo od kontraktu na schemat API.
4) Odporność na regresję
Ten punkt jest chyba najbardziej skomplikowany. Poprawna implementacja ewolucji schematu danych jest często nie tyle trudna, co kłopotliwa.
W dużym skrócie zasady wyglądają tak:
- Nigdy nie usuwamy kolumn
- Wszystkie kolumny, które dodajemy muszą być
nullable
albo posiadać wartość domyślną - Typy kolumn możemy tylko rozszerzać przykładowo,
int
możemy zamienić nabigint
ale nie na odwrót - Nie zmieniamy nazw kolumn
Czy w takim razie nie możemy niczego usuwać?
Możemy, ale nie byle jak. Generalnie to jak i jak często będziemy łamać wstęczną kompatybilność zależy tylko od nas.
Jeżeli z naszego źródła danych analitycznych korzystamy tylko wewnętrznie i, powiedzmy, analityk zajmujący się budowaniem raportów jest na bieżąco z zmianami w systemie, przy odpowiedniej koordynacji moglibyśmy dodać nowe tabelki, a następnie usunąć stare, dając mu chwilę na zaktualizowanie raportów.
Jeżeli jednak nasze źródło danych analitycznych wykorzystywane jest do Data Science
, ale pracujemy w środowisku
multi-tenancy i dane analityczne/raporty udostępniane są klientom, wtedy do sprawy musimy podejść zupełnie inaczej.
Polityka przechowywania i archiwizowania danych
Tak jak wspominałem wyżej, bardzo istotne jest, aby dane w bazie analitycznej, szczególnie dostarczane przez różne moduły podlegały tym samym regułom odnośnie czasu przechowywania.
Jeżeli stany magazynowe w systemie trzymamy tylko z ostatniego roku, a zamówienia z ostatnich 5 lat, Analitycy nie będą w stanie zbudować raportu, który będzie zawierał dane z obu tych źródeł.
Jest to bardziej problem natury formalnej niż technicznej. Wydawało by się, że wystarczy się po prostu dogadać, jednak w praktyce nie jest to takie proste.
Aby ustalić wspólną politykę przechowywania i archiwizowania danych należy wziąć pod uwagę nie tylko aspekty techniczne, ale również prawne, biznesowe czy właśnie analityczne, co może wymagać kompromisów.
Przykłady
Popatrzmy teraz na prosty przykład procesu ETL, którego zadaniem jest przeniesienie danych z bazy transakcyjnej do bazy analitycznej.
W tym przykładzie wykorzystam Flow PHP, nie jest to jednak coś specjalnie unikalnego dla PHP. W każdym języku możemy zbudować coś bardzo podobnego za pomocą jakiejkolwiek biblioteki ułatwiającej tworzenie aplikacji CLI oraz jakiegoś narzędzia do przetwarzania danych.
Poniższy przykład (w nieco zmienionej postaci) pochodzi z sesji live stream, którą miałem przyjemność nagrywać z Rolandem prowadzącym kanał Never Code Alone. Materiał video znajdziesz na portalu YouTube pod frazą "Flow PHP"
Załóżmy, że tak mniej więcej wygląda format zamówień:
schema
|-- order_id: ?uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- address: map<string, string>
|-- notes: list<string>
|-- items: list<structure{sku: string, quantity: integer, price: float}>
Naszym celem jest przeniesienie tych zamówień do analitycznej bazy danych, przygotujmy więc schemat danych wejściowych jak i docelowych.
<?php
declare(strict_types=1);
namespace App\DataFrames;
use Flow\ETL\Adapter\Doctrine\DbalMetadata;
use function Flow\ETL\DSL\integer_schema;
use function Flow\ETL\DSL\uuid_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\string_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\Types\DSL\type_map;
use function Flow\Types\DSL\type_string;
use function Flow\ETL\DSL\list_schema;
use function Flow\Types\DSL\type_list;
use function Flow\Types\DSL\type_structure;
use function Flow\Types\DSL\type_integer;
use function Flow\Types\DSL\type_float;
use function \Flow\ETL\DSL\schema;
use Flow\ETL\Schema;
final class Orders
{
public static function sourceSchema() : Schema
{
return schema(
uuid_schema("order_id"),
uuid_schema("seller_id"),
datetime_schema("created_at"),
datetime_schema("updated_at", nullable: true),
datetime_schema("cancelled_at", nullable: true),
float_schema("discount", nullable: true),
string_schema("email"),
string_schema("customer"),
map_schema("address", type: type_map(key_type: type_string(), value_type: type_string())),
list_schema("notes", type: type_list(element: type_string())),
list_schema("items", type: type_list(element: type_structure(elements: ["item_id" => type_string(), "sku" => type_string(), "quantity" => type_integer(), "price" => type_float()]))),
);
}
public static function destinationSchema() : Schema
{
return self::sourceSchema()
->replace('updated_at', datetime_schema("updated_at"))
->remove('address')
->add(
string_schema('street', metadata: DbalMetadata::length(2048)),
string_schema('city', metadata: DbalMetadata::length(512)),
string_schema('zip', metadata: DbalMetadata::length(32)),
string_schema('country', metadata: DbalMetadata::length(128)),
)
->remove('items')
->add(
uuid_schema('item_id', metadata: DbalMetadata::primaryKey()),
string_schema('sku', metadata: DbalMetadata::length(64)),
integer_schema('quantity'),
integer_schema('price'),
string_schema('currency', metadata: DbalMetadata::length(3)),
)
;
}
}
Zwróćmy uwagę na to, że docelowa struktura tabeli nie jest już zorientowana na zamówienia, a na zamówione przedmioty. Naszym celem jest rozpakowanie przedmiotów zamówień tak, aby każdy był osobnym wierszem.
Dzięki temu analityk, który będzie musiał wygenerować raport, nie będzie już musiał kombinować i rozpakowywać jsona w locie.
Kolumna Adres też została rozbita na kilka kolumn, dzięki czemu raport będzie można łatwiej filtrować.
Kolejną istotną transformacją jest zamiana price
z float
na int
poprzez przemnożenie wartości zmiennoprzecinkowej przez 100.
Ostatnią już zmianą będzie dodanie informacji o tym, w jakiej walucie podawane są ceny. Tylko skąd ta informacja pochodzi? Jest to właśnie bardzo istotny szczegół wynikający z niezbyt dobrej implementacji. W tym konkretnym przypadku wszystkie zamówienia są w dolarach. System o tym wie, programiści o tym wiedzą, ale osoba patrząca na tabelki w bazie bez kontekstu nie musi wcale posiadać takiej wiedzy.
Nasza docelowa struktura powinna wyglądać mniej więcej w taki sposób:
schema
|-- order_id: uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- notes: list<string>
|-- street: string
|-- city: string
|-- zip: string
|-- country: string
|-- item_id: uuid
|-- sku: string
|-- quantity: integer
|-- price: integer
|-- currency: string
Następnym krokiem będzie utworzenie odpowiedniej tabelki w bazie analitycznej. Możemy to osiągnąć stosunkowo łatwo dzięki adapterowi dla Doctrine DBAL.
<?php
declare(strict_types=1);
namespace App\Dbal;
use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\Provider\SchemaProvider as MigrationsSchemaProvider;
use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;
final class SchemaProvider implements MigrationsSchemaProvider
{
public const ANALYTICAL_ORDER_LINE_ITEMS = 'order_line_items';
public function createSchema(): Schema
{
return new Schema(
tables: [
to_dbal_schema_table(Orders::destinationSchema(), self::ANALYTICAL_ORDER_LINE_ITEMS),
]
);
}
}
W bazie analitycznej przechowywać będziemy więc "uproszczoną" lub "znormalizowaną" wersję tabeli z zamówieniami. Normalizacja polega na rozpakowaniu przedmiotów zamówienia i uczynieniach ich osobnymi wierszami oraz rozbiciu kolumny "Adres" na kilka kolumn.
Przyjrzyjmy się więc komendzie CLI, która będzie odpowiedzialna za przeniesienie danych z bazy transakcyjnej do bazy analitycznej.
<?php
namespace App\Command;
use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use App\Dbal\SchemaProvider;
use Doctrine\DBAL\Connection;
use Flow\Doctrine\Bulk\Dialect\SqliteInsertOptions;
use Flow\ETL\Rows;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function Flow\ETL\Adapter\Doctrine\from_dbal_key_set_qb;
use function Flow\ETL\Adapter\Doctrine\pagination_key_desc;
use function Flow\ETL\Adapter\Doctrine\pagination_key_set;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\analyze;
use function Flow\ETL\DSL\constraint_unique;
use function Flow\ETL\DSL\data_frame;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\rename_replace;
use function Flow\ETL\DSL\schema_to_ascii;
use function Flow\Types\DSL\type_datetime;
#[AsCommand(
name: 'app:orders:import',
description: 'Import orders from the transactional database to the analytical database.',
)]
class OrdersImportCommand extends Command
{
public function __construct(
private readonly Connection $transactional,
private readonly Connection $analytical,
)
{
parent::__construct();
}
protected function configure()
{
$this->addOption('start-date', null, InputOption::VALUE_REQUIRED, 'Start date for the data pull.', '-24 hours')
->addOption('end-date', null, InputOption::VALUE_REQUIRED, 'End date for the data pull.', 'now')
;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$io->title('Importing orders');
$startDate = type_datetime()->cast($input->getOption('start-date'));
$endDate = type_datetime()->cast($input->getOption('end-date'));
$io->progressStart();
$report = data_frame()
->read(
from_dbal_key_set_qb(
$this->transactional,
$this->transactional->createQueryBuilder()
->select('*')
->from(SchemaProvider::ORDERS)
->where('updated_at BETWEEN :start_date AND :end_date')
->setParameter('start_date', $startDate->format('Y-m-d H:i:s'))
->setParameter('end_date', $endDate->format('Y-m-d H:i:s')),
pagination_key_set(
pagination_key_desc('updated_at'),
pagination_key_desc('order_id')
)
)->withSchema(Orders::sourceSchema())
)
->withEntry('_address', ref('address')->unpack())
->renameEach(rename_replace('_address.', ''))
->withEntry('_item', ref('items')->expand())
->withEntry('_item', ref('_item')->unpack())
->renameEach(rename_replace('_item.', ''))
->drop('_item', 'items', 'address')
->withEntry('currency', lit('USD'))
->withEntry('price', ref('price')->multiply(100))
->constrain(constraint_unique('item_id'))
->match(Orders::destinationSchema())
->write(
to_dbal_table_insert(
$this->analytical,
SchemaProvider::ORDER_LINE_ITEMS,
SqliteInsertOptions::fromArray([
'conflict_columns' => ['item_id'],
])
)
)
->run(function (Rows $rows) use ($io) {
$io->progressAdvance($rows->count());
}, analyze: analyze())
;
$io->progressFinish();
$io->newLine();
$io->definitionList(
'Orders Import Summary',
new TableSeparator(),
['Execution time ' => \number_format($report->statistics()->executionTime->highResolutionTime->seconds) . ' seconds'],
['Memory usage ' => \number_format($report->statistics()->memory->max()->inMb()) . ' MB'],
['Rows inserted ' => \number_format($report->statistics()->totalRows())],
);
return Command::SUCCESS;
}
}
Oczywiście nie jest to najpiękniejsza ani nawet najbardziej poprawna forma. Normalnie komenda CLI nie zawierałaby
definicji pipeline'u ETL
, jednak na potrzeby przykładu jest to dobry start.
Dedykowany centralny magazyn danych, to niewątpliwie kusząca opcja, szczególnie w miejscach gdzie brak widoczności uniemożliwia sprawne podejmowanie decyzji.
Na szczęscie jest to ten rodzaj funkcjonalności, którą można dołozyć w zasadzie na każdym etapie życia projektu.
Może wymagać wprowadzenia dodatkowych procesów oraz pewnej dyscypliny od zespołów, jednak korzyści płynące z takiego rozwiązania są ogromne.
- Nie ma obawy o to, że analityka wpłynie na pracę systemu
- Mamy dostęp do wszystkich zakamarków naszego systemu, każdego mikroserwisu czy modułu
- Taka centralna baza danych to najlepszy prezent dla analityków
- Data Science nie polega już na przepalaniu czasu na oczyszczanie danych
- Możemy łatwo i bezpiecznie podpiąć w zasadzie dowolne narzędzia typu Business Intelligence
- Tworzymy kulturę pracy z danymi w ramach naszej organizacji
Oczywiście jak wszystko co nowe, takie zmiany mogą wyglądać na trudne do wprowadzenia. Brak doświadczenia w pracy z danymi przynajmniej na początkowych etapach sprawia, że zadanie to może wydawać się wręcz niewykonalne.
Z mojego doświadczenia wynika jednak, że najtrudniej jest zacząć, kiedy mamy już:
- Kilka pierwszych
Pipeline'ów
przetwarzających dane - Kilka lub kilkanaście schematów naszych danych
- Jakieś bardziej złożone transformacje
- Przygotowane testy
- Ustalone procesy i procedury
Praca idzie wręcz maszynowo.
Warto jednak pamiętać, że nie ma jednego uniwersalnego rozwiązania, które będzie pasować do każdego systemu. W każdym przypadku należy dostosować podejście do specyfiki danego systemu i organizacji.
Jak zacząć?
Jeśli potrzebujesz pomocy w zakresie budowy centralnego magazynu danych, chętnie Ci pomogę.
Skontaktuj się ze mną, a wspólnie stworzymy rozwiązanie, które będzie idealnie dopasowane do Twoich potrzeb.
Zachęcam również do odwiedzenia serwera Discord - Flow PHP, na którym możemy porozmawiać bezpośrednio.
