
Análisis de Datos en Sistemas Transaccionales Distribuidos
En este artículo abordaré el problema del análisis de datos en sistemas transaccionales distribuidos.
Si buscas ideas para construir un almacén de datos centralizado que te permita recopilar datos de todo el sistema,
independientemente de su fragmentación y sin ahogarte en costos operacionales, este artículo es para ti.
Todo comienza de forma inocente
La mayoría de los sistemas que creamos día a día almacenan datos en alguna base de datos relacional. PostgreSQL es una opción muy popular y, al mismo tiempo, una buena elección que en los últimos años se ha convertido casi en el estándar de la industria.
La historia de la mayoría de proyectos suele ser muy similar: comenzamos verificando la idea, conseguimos los primeros usuarios, el sistema empieza a generar ingresos, el negocio idea cómo aumentar las ganancias, surgen nuevas funcionalidades. Cada nueva funcionalidad significa algunas tablas nuevas en la base de datos.
Para acelerar el desarrollo utilizamos un ORM, generamos automáticamente migraciones que crean y actualizan el esquema de la base de datos.
Inicialmente todo va bien, las nuevas funcionalidades traen las ganancias esperadas, el negocio comienza a escalar. Contratamos más programadores para crear más funcionalidades en paralelo.
De vez en cuando alguien reporta que el sistema en algunos lugares comienza a "ralentizarse", reconocimiento rápido, diagnóstico aún más rápido, falta un índice en alguna tabla.
En la configuración de mapeos del ORM agregamos un índice en el campo por el cual el sistema busca datos muy frecuentemente. Problema resuelto.
El equipo de programadores en crecimiento da mucha importancia a la calidad, tal vez incluso utiliza
técnicas avanzadas de desarrollo de software, como Event Storming o Domain-Driven Design.
El CI/CD ejecuta incontables pruebas, asegurándose de que los cambios no introduzcan regresiones.
El idilio dura, el equipo o quizás varios equipos comienzan a agregar nuevos módulos al sistema. Módulos apropiadamente aislados, responsables de tareas específicas, nunca traspasando sus límites y no entrometiéndose en las competencias de otros módulos.
Para la comunicación se utilizan naturalmente colas, implementamos el Patrón Outbox/Inbox
Para asegurar el aislamiento apropiado, establecemos reglas que dicen que cada módulo tiene acceso únicamente a aquellas tablas en la base de datos que le pertenecen. Para obtener datos de otro módulo es necesario dirigirse a ese módulo, ya sea a través de alguna API interna o de cualquier otra manera.
De vez en cuando el negocio viene a nosotros con la pregunta ¿pueden generar rápidamente para nosotros este informe?. Por supuesto, unas pocas líneas de SQL, tal vez algunas decenas y el informe está listo.
El negocio satisfecho, el informe en formato CSV va a Excel (la herramienta de BI más popular), el negocio saca conclusiones, planifica nuevas funcionalidades y cambios.

El tiempo pasa, las nuevas tablas brotan como hongos después de la lluvia
En esta situación podemos permanecer mucho tiempo, incluso varios años buenos.
Mientras tanto, alguien en algún lugar seguramente tendrá la idea de agregar al sistema la posibilidad de generar informes. Es solo cuestión de tiempo.
Los informes sobre el estado del sistema son para el negocio una de las herramientas más cruciales que proporcionan visión de comportamientos, preferencias o tendencias de usuarios. Permiten no solo entender qué está pasando, sino también planificar apropiadamente lo que está por suceder.
Mientras mejores y más detallados sean los informes, mejores decisiones se pueden tomar basándose en ellos. Buenas decisiones empresariales se traducen en mayores ganancias, mayores ganancias se traducen en mayor presupuesto. Mayor presupuesto se traduce en mejores herramientas, equipos más grandes, mejores salarios o bonos.
En el interés de cada programador debería estar entonces proporcionar al negocio los datos más buenos y precisos posibles, después de todo mejores resultados se traducen directamente en mejores ganancias.
Primeros síntomas
El sistema funciona, genera ganancias. Consiste en alrededor de 5, tal vez incluso 10 módulos, cada módulo consiste en 20-50 tablas en la base de datos. Cada módulo proporciona sus propios informes.
- Ventas
- Marketing
- Logística
- Inventarios
- Usuarios
Cada módulo expone solo parte de los datos, una fracción de la imagen más grande, ninguno sin embargo da una vista de la totalidad.
Los equipos implementaron claves de referencia a datos provenientes de otros módulos, incluso lograron crear en la interfaz de usuario un lugar desde el cual se pueden generar informes.
Pero esto sigue siendo insuficiente...
Muy rápidamente resulta que los informes generados en diferentes módulos, escritos por diferentes programadores, quizás incluso en diferentes tecnologías tienen diferentes formatos de datos, diferentes estándares de nomenclatura.
Los rangos de fechas se interpretan de manera diferente, un módulo incluye las fechas de inicio y fin, otro las excluye, y otro más hace un intervalo abierto por la derecha para facilitar la paginación, porque también tienen API y ese API utiliza el mismo pedazo de código.
Dado que cada módulo es independiente, posee sus límites, su nomenclatura, en cierto momento nos orientamos que lo que en un módulo llamamos de cierta manera, otro módulo lo expone bajo un nombre completamente diferente. Porque en el contexto de ese módulo tiene sentido.
Con el tiempo probablemente también nos orientemos que cada equipo definió de manera diferente su política de retención y almacenamiento de datos. A pesar de tener en el módulo clave datos de los últimos 5 años, no podemos hacer nada con ellos, porque los módulos que proporcionaban datos necesarios para enriquecer el informe básico, poseen datos únicamente de los últimos 2 años.
Sin embargo, estos no son problemas que un poco de magia en Excel no pueda resolver (tal vez excepto las faltas en los datos). A estas columnas les cambiaremos los nombres, estas las eliminaremos, agregaremos un filtrado rápido y ya está.
Crearemos un gran archivo en el cual tendremos una hoja llamada "Dashboard", y todas las otras serán solo de lectura, alimentarán el dashboard.
Tal vez este enfoque incluso funcione por un tiempo. Tal vez incluso más que un tiempo, pero no nos hagamos ilusiones. Todo esto al final fallará, y según las leyes de Murphy fallará en el peor momento posible.
¿Qué hay de malo en Excel?
¡Nada! Excel es una herramienta fantástica. El problema no está en Excel, sino en su utilización.
Toda esa magia que consiste en limpiar y preparar datos no debería tener lugar en Excel, no a gran escala. Si hablamos de un informe rápido de una sola vez, no hay problema. Hacemos lo que debemos, creamos fórmulas, analizamos datos y lo olvidamos.
Sin embargo, si esto va a ser parte de nuestra rutina diaria, si cíclicamente debemos pasar por el mismo proceso, siguiendo los cambios constantes y la evolución del sistema, más temprano que tarde resultará que esas hojas están desactualizadas.
Las columnas dejaron de existir o cambiaron nombres, surgieron nuevas columnas, el formato de datos cambió o lo que es peor, uno de los equipos encargado de uno de los módulos eliminó algunos datos sin conciencia de que estaban siendo utilizados por algún usuario empresarial en algún lugar en uno de sus informes, que abre una vez por trimestre.
A largo plazo, las hojas de cálculo más complejas que extraen datos de informes generados automáticamente por el sistema, que luego se unen basándose en reglas implícitas, son imposibles de mantener.
¿Entonces conectamos alguna herramienta de BI?
Pensaron muchos programadores que se han enfrentado repetidamente al problema de generar informes.
Tomemos por ejemplo Metabase. Una herramienta gratuita que podemos configurar en minutos usando Docker.
Darle acceso a nuestra base y a algunas o todas las tablas, y a través de una interfaz de usuario muy amigable el negocio podrá generar de manera muy fácil y placentera los informes más complicados.
¡Informes que podrán contener datos de varios módulos al mismo tiempo!
Incluso podemos contratar un analista de datos con fundamentos de SQL, que todo lo que no se pueda hacer clic, lo logre a través de una consulta apropiadamente preparada.
Pero eso no resuelve el problema
Solo lo posterga en el tiempo.
Si miramos exactamente qué cambió, solo cambió una cosa. La herramienta... Trasladamos el problema de limpieza y unión de datos de Excel a Metabase.
Excel ciertamente volvió a su papel original, ahora podemos poner los informes descargados de Metabase en Excel.
Sin embargo, nuestra lógica implícita de unión/limpieza de datos se trasladó de la hoja de cálculo a las consultas SQL.
Además, todos los problemas siguieron siendo los mismos:
- inconsistencia de datos
- inconsistencia de nomenclatura
- falta de política unificada de compatibilidad hacia atrás
- falta de política unificada de retención de datos
¿Entonces establecemos procesos y reglas?
La mayoría de los problemas anteriores se pueden resolver implementando procesos y reglas apropiados.
Podemos establecer estándares de nomenclatura que digan que cada tabla en la base debe contener en el nombre el prefijo del módulo, y las columnas se nombran con letras minúsculas y separadas por guiones bajos.
Podemos establecer que cada módulo almacena datos de los últimos 5 años (hot storage), todo lo más antiguo se archiva. (cold storage)
Podemos establecer que los rangos de fechas siempre se tratan como intervalos abiertos por la derecha.
Podemos establecer que no eliminamos ninguna columna de la base de datos, o que antes de eliminar cualquier cosa primero entramos en un período de transición, durante el cual mostramos a cada usuario del sistema qué columnas cambiarán y de qué manera.
Incluso si asumimos para propósitos de discusión que logramos implementar estos procesos globalmente entre varios equipos, y que estos equipos los seguirán absoluta y muy precisamente, eso no será suficiente...
Escalar la base de datos no es barato
Especialmente si nos basamos en soluciones en la nube.
Imaginemos una situación en la cual en las horas pico de trabajo del sistema (cuando los usuarios generan más transacciones) un analista empresarial, que trabaja según su propio plan debe generar un informe basado en un típico SQL de miles de líneas.
El analista ejecuta la consulta, la base de datos comienza a trabajar duro. La consulta dura 5, 10, 15 minutos. La base de datos comienza a sudar.
Los usuarios bombardean el sistema con nuevos pedidos (o cualquier otra operación que genere muchas escrituras) mientras el analista espera los resultados.
En el mismo momento alguien del negocio necesita verificar rápidamente varios informes, cada uno contiene "el número total de filas en la tabla". Hay varias de estas personas.
Todas estas operaciones se superponen entre sí, nuestra ya muy cargada base de datos no puede manejar.
Algunas transacciones de usuarios no se completan.
El sistema apenas respira. El tiempo de espera para las operaciones más básicas se mide en segundos.
Y ahora la cereza del pastel, cuando todas estas escenas dantescas tienen lugar, cuando Pager Duty está al rojo vivo de todo tipo de incidentes, cuando los equipos en pánico tratan de revivir el sistema, los devops combinan cómo escalar rápidamente la base de datos...

El CEO comienza una presentación para un potencial socio empresarial, con quien la cooperación resulta ser clave en la estrategia de desarrollo de la empresa...
¿Entonces simplemente configuramos una réplica?
Después de todo, los informes no sobrecargarán nuestra base transaccional.
Duplicaremos los costos de mantenimiento de la base de datos, pero reduciremos el riesgo de sobrecargar el sistema y podremos conectar la herramienta de business intelligence favorita directamente a la réplica, lo que nos dará datos en tiempo real.
Suena fantástico, pero en la práctica no es tan simple.
Dejando de lado incluso los problemas potenciales resultantes de la naturaleza misma de la replicación, el problema principal y fundamental con el que me encuentro más frecuentemente es la percepción.
De manera completamente diferente mirará las tablas en la base de datos el programador que generó esas tablas usando mapeos de ORM, que el analista de datos.
El programador sabrá qué tablas conectar juntas para obtener la imagen completa. Entenderá las limitaciones y condiciones enterradas en algún lugar del código de la aplicación. Sobre todo, el programador conoce o al menos debería orientarse sobre cómo se ve el ciclo de vida del sistema (sus datos).
Todo este conocimiento frecuentemente no está disponible para los analistas.
Es como decirle a alguien que mire algo a través del ojo de la cerradura. Algo seguramente se puede ver. Algunas conclusiones se pueden extraer, pero será muy difícil reconstruir la totalidad.
Basta con que tengamos en la base de datos una columna de tipo JSONB en la cual almacenamos algunas estructuras de datos. Asumamos que el sistema permite 3 combinaciones correctas de la misma estructura, pero una es súper rara, tan rara que aún no ha ocurrido en el sistema. Mirando los datos, incluso de manera integral, el analista simplemente no puede saber que existen 3 combinaciones de una estructura. Durante la normalización considerará 2 casos, mientras que el tercero se convertirá en una bomba de tiempo que explotará como siempre en el momento menos esperado.
En otras palabras, si tenemos en el sistema varios módulos independientes. Cada uno con su base de datos, o al menos sus tablas en la base. Lo que sumado nos da 200-300 tablas, la expectativa de que el analista maneje esto sin problemas, no cometa errores y los informes no se desvíen de las expectativas, es delicadamente hablando ingenua.
A pesar de todo, exponer una copia/réplica de la base de datos para analistas y darle un nombre de 4 letras derivado de la palabra "analytics" sigue siendo ampliamente utilizado.
Las herramientas de BI compiten en quién creará una mejor interfaz de usuario, gracias a la cual los informes se puedan hacer con clics. Prometen que podremos analizar datos sin SQL.
Sí, esto puede funcionar, en muchos lugares así es como funciona. De lo que no hablamos en voz alta es:
- Problemas con compatibilidad hacia atrás y cambios en la estructura de datos
- Problemas con el mantenimiento apropiado / versionado / pruebas de consultas SQL gigantescas/scripts que normalizan datos sobre la marcha
- Las réplicas/Copias generan costos adicionales
- La reducción de recursos de réplicas es imposible o imposibilita generar informes en tiempos aceptables
Lo que resulta en impacto en la calidad de datos y efectividad en la toma de decisiones empresariales.
¿Qué nos queda?
Tal vez primero establezcamos qué problemas queremos resolver en primer lugar:

- Analizar datos / generar informes no puede tener ningún impacto en el trabajo del sistema.
- Los datos en los informes deben ser siempre frescos (el retraso en los datos es aceptable, establecido individualmente)
- Los informes deben reflejar el estado real, no distorsionado del sistema
- La estructura de datos debe ser resistente a regresiones
- Política consistente de retención y archivo de datos
1) Separación de Recursos
No es nada revolucionario, si no queremos que nuestro sistema esté expuesto a sobrecargas resultantes del abuso de la base de datos a través de la generación de informes, debemos configurar una base de datos separada.
¿Qué base elegir para analítica?
Este es básicamente tema para un artículo separado o incluso una serie de artículos. Hay muchas soluciones, unas mejores, otras peores. No existe una solución mágica para todos los problemas.
Mi consejo, especialmente para equipos más pequeños, sin experiencia en gestión de datos es no lanzarse a tecnologías con las que no tienen experiencia.
Lo clave es el formato apropiado de datos. Después de cambiar muchas tablas angostas por una ancha probablemente
resultará que generar el mismo informe solo sin usar 20x JOIN
ya no toma 10 minutos
sino menos de medio segundo.
¿Y si el problema son las agregaciones, no las uniones?
Entonces, en lugar de agregar sobre la marcha, es mejor preparar una tabla que contenga esos datos en forma agregada, no cruda.
2) Datos Frescos
Bueno, pero dado que creamos una nueva base de datos independiente, ¿de qué manera nos aseguraremos de que los datos en esta base sean frescos y actuales?
Aquí mucho depende del retraso aceptable en la sincronización de datos. Frecuentemente es suficiente que la base analítica esté aproximadamente 24 horas detrás de la base transaccional. Es decir, contenga datos hasta "ayer", incluyendo todo "ayer".
¿Por qué? Porque pocas decisiones empresariales se toman en el momento. Si algunas decisiones deben tomarse en tan poco tiempo, entonces se construyen las automatizaciones apropiadas.
Si el retraso de 24 horas es aceptable (a veces no lo es y para eso también hay formas), es suficiente que realicemos sincronizaciones varias veces al día. Por supuesto aquí tampoco hay regla de oro. Así como no hay regla que diga qué tan grande rango sincronizar de una vez.
Hay una buena práctica que facilita la sincronización. Consiste en asegurarse de que las tablas principales en el sistema transaccional contengan la fecha de creación/modificación del registro.
Teniendo estas dos informaciones podemos estrechar la ventana de sincronización a algún período de tiempo específico.
¿Cómo se ve esto en la práctica? Podemos por ejemplo ejecutar el proceso de sincronización cada 6 horas, recolectando solo registros cambiados en
las últimas 24 horas.
Por supuesto estos son números de ejemplo, estos valores deben establecerse basándose en el tamaño y comportamiento de los datos.
¿Por qué 24 horas? Tal protección adicional. Podríamos tomar datos solo de 7 horas, pero si por cualquier motivo la sincronización no se ejecuta, y no lo detectamos, podemos perder datos.
3) Reflejo del Estado del Sistema
Mi opinión sobre este tema puede parecer controvertida, pero creo que el mejor conocimiento sobre datos y comportamiento del sistema o módulo lo tiene el equipo que construye ese sistema/módulo.
Es precisamente este equipo el que debería ser responsable de que los datos generados por el sistema o su parte por la cual dado equipo es responsable, lleguen al repositorio central de datos.
En otras palabras, es precisamente el equipo que implementa dada funcionalidad quien debería basándose en los requisitos recopilados previamente transformar esos datos al formato apropiado y empujarlos hacia adelante.
Esta es probablemente la manera más fácil de asegurarse de que los datos sean completos y que los programadores del equipo dado estén conscientes de que esos datos se utilizan en algún lugar. El formato de datos analíticos se convierte para ellos en una especie de contrato - un contrato que deben respetar.
No es muy diferente del contrato en el esquema de API.
4) Resistencia a regresiones
Este punto es probablemente el más complicado. La implementación correcta de la evolución del esquema de datos es frecuentemente no tanto difícil, como problemática.
En gran resumen las reglas se ven así:
- Nunca eliminamos columnas
- Todas las columnas que agregamos deben ser
nullable
o tener un valor por defecto - Los tipos de columnas solo podemos expandirlos por ejemplo,
int
podemos cambiarlo abigint
pero no al revés - No cambiamos nombres de columnas
¿Entonces no podemos eliminar nada?
Podemos, pero no de cualquier manera. Generalmente cómo y con qué frecuencia romperemos la compatibilidad hacia atrás depende solo de nosotros.
Si de nuestra fuente de datos analíticos solo usamos internamente y, digamos, el analista que se ocupa de construir informes está al día con los cambios en el sistema, con la coordinación apropiada podríamos agregar nuevas tablas, y luego eliminar las viejas, dándole tiempo para actualizar los informes.
Sin embargo, si nuestra fuente de datos analíticos se utiliza para Data Science
, pero trabajamos en un entorno
multi-tenancy y los datos analíticos/informes se proporcionan a clientes, entonces debemos abordar el asunto de manera completamente diferente.
Política de almacenamiento y archivo de datos
Como mencioné anteriormente, es muy importante que los datos en la base analítica, especialmente los proporcionados por diferentes módulos estén sujetos a las mismas reglas respecto al tiempo de almacenamiento.
Si los inventarios en el sistema los mantenemos solo del último año, y los pedidos de los últimos 5 años, los analistas no podrán construir un informe que contenga datos de ambas fuentes.
Es más un problema de naturaleza formal que técnica. Parecería que es suficiente simplemente ponerse de acuerdo, sin embargo en la práctica no es tan simple.
Para establecer una política común de almacenamiento y archivo de datos es necesario tomar en cuenta no solo aspectos técnicos, sino también legales, empresariales o precisamente analíticos, lo que puede requerir compromisos.
Ejemplos
Veamos ahora un ejemplo simple de proceso ETL, cuya tarea es transferir datos de la base transaccional a la base analítica.
En este ejemplo utilizaré Flow PHP, sin embargo no es algo especialmente único para PHP. En cualquier lenguaje podemos construir algo muy similar usando cualquier biblioteca que facilite crear aplicaciones CLI y alguna herramienta para procesamiento de datos.
El siguiente ejemplo (en forma ligeramente modificada) proviene de una sesión de transmisión en vivo que tuve el placer de grabar con Roland, quien maneja el canal Never Code Alone. El material de video lo encontrarás en YouTube bajo la frase "Flow PHP"
Asumamos que así más o menos se ve el formato de pedidos:
schema
|-- order_id: ?uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- address: map<string, string>
|-- notes: list<string>
|-- items: list<structure{sku: string, quantity: integer, price: float}>
Nuestro objetivo es transferir estos pedidos a la base de datos analítica, preparemos entonces el esquema de datos de entrada y destino.
<?php
declare(strict_types=1);
namespace App\DataFrames;
use Flow\ETL\Adapter\Doctrine\DbalMetadata;
use function Flow\ETL\DSL\integer_schema;
use function Flow\ETL\DSL\uuid_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\string_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\Types\DSL\type_map;
use function Flow\Types\DSL\type_string;
use function Flow\ETL\DSL\list_schema;
use function Flow\Types\DSL\type_list;
use function Flow\Types\DSL\type_structure;
use function Flow\Types\DSL\type_integer;
use function Flow\Types\DSL\type_float;
use function \Flow\ETL\DSL\schema;
use Flow\ETL\Schema;
final class Orders
{
public static function sourceSchema() : Schema
{
return schema(
uuid_schema("order_id"),
uuid_schema("seller_id"),
datetime_schema("created_at"),
datetime_schema("updated_at", nullable: true),
datetime_schema("cancelled_at", nullable: true),
float_schema("discount", nullable: true),
string_schema("email"),
string_schema("customer"),
map_schema("address", type: type_map(key_type: type_string(), value_type: type_string())),
list_schema("notes", type: type_list(element: type_string())),
list_schema("items", type: type_list(element: type_structure(elements: ["item_id" => type_string(), "sku" => type_string(), "quantity" => type_integer(), "price" => type_float()]))),
);
}
public static function destinationSchema() : Schema
{
return self::sourceSchema()
->replace('updated_at', datetime_schema("updated_at"))
->remove('address')
->add(
string_schema('street', metadata: DbalMetadata::length(2048)),
string_schema('city', metadata: DbalMetadata::length(512)),
string_schema('zip', metadata: DbalMetadata::length(32)),
string_schema('country', metadata: DbalMetadata::length(128)),
)
->remove('items')
->add(
uuid_schema('item_id', metadata: DbalMetadata::primaryKey()),
string_schema('sku', metadata: DbalMetadata::length(64)),
integer_schema('quantity'),
integer_schema('price'),
string_schema('currency', metadata: DbalMetadata::length(3)),
)
;
}
}
Notemos que la estructura de destino de la tabla ya no está orientada a pedidos, sino a artículos pedidos. Nuestro objetivo es desempaquetar los artículos de pedidos para que cada uno sea una fila separada.
Gracias a esto el analista que deba generar un informe ya no tendrá que idear y desempaquetar el json sobre la marcha.
La columna Dirección también fue dividida en varias columnas, gracias a lo cual el informe se podrá filtrar más fácilmente.
Otra transformación importante es el cambio de price
de float
a int
mediante la multiplicación del valor de punto flotante por 100.
El último cambio será agregar información sobre en qué moneda se proporcionan los precios. ¿Pero de dónde viene esta información? Este es precisamente un detalle muy importante resultante de una implementación no muy buena. En este caso específico todos los pedidos están en dólares. El sistema lo sabe, los programadores lo saben, pero la persona que mira las tablas en la base sin contexto no necesariamente posee tal conocimiento.
Nuestra estructura de destino debería verse más o menos así:
schema
|-- order_id: uuid
|-- seller_id: uuid
|-- created_at: datetime
|-- updated_at: datetime
|-- cancelled_at: ?datetime
|-- discount: ?float
|-- email: string
|-- customer: string
|-- notes: list<string>
|-- street: string
|-- city: string
|-- zip: string
|-- country: string
|-- item_id: uuid
|-- sku: string
|-- quantity: integer
|-- price: integer
|-- currency: string
El siguiente paso será crear la tabla apropiada en la base analítica. Podemos lograr esto relativamente fácilmente gracias al adaptador para Doctrine DBAL.
<?php
declare(strict_types=1);
namespace App\Dbal;
use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Migrations\Provider\SchemaProvider as MigrationsSchemaProvider;
use function Flow\ETL\Adapter\Doctrine\to_dbal_schema_table;
final class SchemaProvider implements MigrationsSchemaProvider
{
public const ANALYTICAL_ORDER_LINE_ITEMS = 'order_line_items';
public function createSchema(): Schema
{
return new Schema(
tables: [
to_dbal_schema_table(Orders::destinationSchema(), self::ANALYTICAL_ORDER_LINE_ITEMS),
]
);
}
}
En la base analítica almacenaremos entonces una versión "simplificada" o "normalizada" de la tabla de pedidos. La normalización consiste en desempaquetar los artículos del pedido y hacer de ellos filas separadas, así como dividir la columna "Dirección" en varias columnas.
Veamos entonces el comando CLI que será responsable de transferir datos de la base transaccional a la base analítica.
<?php
namespace App\Command;
use App\DataFrames\Orders;
use App\DataFrames\OrdersCSV;
use App\Dbal\SchemaProvider;
use Doctrine\DBAL\Connection;
use Flow\Doctrine\Bulk\Dialect\SqliteInsertOptions;
use Flow\ETL\Rows;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\TableSeparator;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use function Flow\ETL\Adapter\Doctrine\from_dbal_key_set_qb;
use function Flow\ETL\Adapter\Doctrine\pagination_key_desc;
use function Flow\ETL\Adapter\Doctrine\pagination_key_set;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\analyze;
use function Flow\ETL\DSL\constraint_unique;
use function Flow\ETL\DSL\data_frame;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\rename_replace;
use function Flow\ETL\DSL\schema_to_ascii;
use function Flow\Types\DSL\type_datetime;
#[AsCommand(
name: 'app:orders:import',
description: 'Import orders from the transactional database to the analytical database.',
)]
class OrdersImportCommand extends Command
{
public function __construct(
private readonly Connection $transactional,
private readonly Connection $analytical,
)
{
parent::__construct();
}
protected function configure()
{
$this->addOption('start-date', null, InputOption::VALUE_REQUIRED, 'Start date for the data pull.', '-24 hours')
->addOption('end-date', null, InputOption::VALUE_REQUIRED, 'End date for the data pull.', 'now')
;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$io->title('Importing orders');
$startDate = type_datetime()->cast($input->getOption('start-date'));
$endDate = type_datetime()->cast($input->getOption('end-date'));
$io->progressStart();
$report = data_frame()
->read(
from_dbal_key_set_qb(
$this->transactional,
$this->transactional->createQueryBuilder()
->select('*')
->from(SchemaProvider::ORDERS)
->where('updated_at BETWEEN :start_date AND :end_date')
->setParameter('start_date', $startDate->format('Y-m-d H:i:s'))
->setParameter('end_date', $endDate->format('Y-m-d H:i:s')),
pagination_key_set(
pagination_key_desc('updated_at'),
pagination_key_desc('order_id')
)
)->withSchema(Orders::sourceSchema())
)
->withEntry('_address', ref('address')->unpack())
->renameEach(rename_replace('_address.', ''))
->withEntry('_item', ref('items')->expand())
->withEntry('_item', ref('_item')->unpack())
->renameEach(rename_replace('_item.', ''))
->drop('_item', 'items', 'address')
->withEntry('currency', lit('USD'))
->withEntry('price', ref('price')->multiply(100))
->constrain(constraint_unique('item_id'))
->match(Orders::destinationSchema())
->write(
to_dbal_table_insert(
$this->analytical,
SchemaProvider::ORDER_LINE_ITEMS,
SqliteInsertOptions::fromArray([
'conflict_columns' => ['item_id'],
])
)
)
->run(function (Rows $rows) use ($io) {
$io->progressAdvance($rows->count());
}, analyze: analyze())
;
$io->progressFinish();
$io->newLine();
$io->definitionList(
'Orders Import Summary',
new TableSeparator(),
['Execution time ' => \number_format($report->statistics()->executionTime->highResolutionTime->seconds) . ' seconds'],
['Memory usage ' => \number_format($report->statistics()->memory->max()->inMb()) . ' MB'],
['Rows inserted ' => \number_format($report->statistics()->totalRows())],
);
return Command::SUCCESS;
}
}
Por supuesto esta no es la forma más hermosa ni siquiera la más correcta. Normalmente el comando CLI no contendría
la definición del pipeline ETL
, sin embargo para propósitos del ejemplo es un buen inicio.
Un almacén de datos centralizado dedicado es sin duda una opción tentadora, especialmente en lugares donde la falta de visibilidad imposibilita la toma eficiente de decisiones.
Afortunadamente es este tipo de funcionalidad que se puede agregar básicamente en cualquier etapa de la vida del proyecto.
Puede requerir la introducción de procesos adicionales y cierta disciplina de los equipos, sin embargo los beneficios que fluyen de tal solución son enormes.
- No hay temor de que la analítica impacte el trabajo del sistema
- Tenemos acceso a todos los rincones de nuestro sistema, cada microservicio o módulo
- Tal base de datos central es el mejor regalo para los analistas
- Data Science ya no consiste en quemar tiempo limpiando datos
- Podemos conectar fácil y seguramente básicamente cualquier herramienta de tipo Business Intelligence
- Creamos una cultura de trabajo con datos dentro de nuestra organización
Por supuesto, como todo lo nuevo, tales cambios pueden parecer difíciles de introducir. La falta de experiencia trabajando con datos al menos en las etapas iniciales hace que esta tarea pueda parecer incluso irrealizable.
De mi experiencia resulta sin embargo que lo más difícil es comenzar, cuando ya tenemos:
- Algunos primeros
Pipelines
procesando datos - Algunos o varios esquemas de nuestros datos
- Algunas transformaciones más complejas
- Pruebas preparadas
- Procesos y procedimientos establecidos
El trabajo va prácticamente como máquina.
Sin embargo vale la pena recordar que no hay una solución universal que se adapte a cada sistema. En cada caso es necesario adaptar el enfoque a la especificidad del sistema dado y la organización.
¿Cómo empezar?
Si necesitas ayuda en el ámbito de construcción de un almacén de datos centralizado, con gusto te ayudaré.
Contáctame, y juntos crearemos una solución que será perfectamente adaptada a tus necesidades.
También te animo a visitar el servidor Discord - Flow PHP, donde podemos hablar directamente.
