Event Sourcing en Azure- parte 2: Persistencia de eventos
¡Hola todos! Bienvenido de nuevo a la segunda parte de la serie Event Sourcing on Azure . Hoy vamos a divagar un poco sobre los detalles de implementación y algunas de las elecciones y compensaciones que he hecho. Nos centraremos en cómo he gestionado la persistencia de los eventos y qué herramienta he elegido para ello.
La última vez vimos cómo se vería una arquitectura genérica de abastecimiento de eventos. Como ya escribí, no hay una fórmula mágica. Puede haber ocasiones en las que no pueda simplemente aplicar un patrón de diseño tal como está, pero tendrá que adaptarlo a sus necesidades. Pero es bueno conocer los conceptos básicos, comprender las reglas básicas y divergir si es necesario.
Ahora, tómese un tiempo y explore las innumerables líneas de código de SuperSafeBank , esperaré. Comencé este repositorio para demostrar cómo es posible aprovechar Eventstore y Kafka para construir un sistema de abastecimiento de eventos con DDD. Un buen proyecto para ser justos, aprendí bastante.
Pero aun así, con todo el mundo y su perro usando The Cloud ™ , se sintió natural evolucionar la base de código y pasar a Azure. Todavía tengo que migrar toda la solución, pero la mayor parte está completa.
Hagamos una pausa por un segundo y revisemos cuáles son los requisitos. Nuestro sistema debe ser capaz de
- crear clientes
- obtener los detalles del cliente por id
- crear cuentas para un cliente
- obtener los detalles de la cuenta del cliente por ID de cuenta
- retirar dinero de una cuenta
- depositar dinero en una cuenta
Cada uno de esos puntos corresponderá a REST EndPoint. La implementación local anterior utilizaba una API web .NET para exponer todos los puntos finales. Un Worker en segundo plano fue responsable de manejar la ejecución real del comando y crear las vistas materializadas . Esto se hace escuchando algunos temas de Kafka y reaccionando a los eventos recibidos.
En cambio, la nueva versión de Azure se deshace de Eventstore y Kafka en favor de CosmosDB y ServiceBus , respectivamente.
CosmosDB es responsable de almacenar los eventos, manejar el control de versiones y la coherencia. Aquí está disponible una implementación muy simple , pero básicamente esta es la mayor parte:
var partitionKey = new PartitionKey(aggregateRoot.Id.ToString());
var firstEvent = aggregateRoot.Events.First();
var expectedVersion = firstEvent.AggregateVersion;
var dbVersionResp = await _container.GetItemLinqQueryable>(
requestOptions: new QueryRequestOptions()
{
PartitionKey = partitionKey
}).Select(e => e.AggregateVersion)
.MaxAsync();
if (dbVersionResp.Resource != expectedVersion)
throw new AggregateException($"aggregate version mismatch, expected {expectedVersion} , got {dbVersionResp.Resource}");
var transaction = _container.CreateTransactionalBatch(partitionKey);
foreach (var @event in aggregateRoot.Events)
{
var data = _eventSerializer.Serialize(@event);
var eventType = @event.GetType();
var eventData = EventData.Create(aggregateRoot.Id, aggregateRoot.Version, eventType.AssemblyQualifiedName, data);
transaction.CreateItem(eventData);
}
await transaction.ExecuteAsync();
Primero consultará la última versión de un agregado determinado. Como puede ver, el ID agregado se usa como clave de partición. Si la versión esperada no coincide, entonces alguien ya ha actualizado los datos, por lo que no podemos continuar.
Si todo está bien, abrirá una transacción y escribirá todos los eventos disponibles en el agregado.
Rehidratar un agregado es bastante fácil:
public async Task RehydrateAsync(TKey key)
{
var partitionKey = new PartitionKey(key.ToString());
var events = new List>();
using var setIterator = _container.GetItemQueryIterator>(requestOptions: new QueryRequestOptions { MaxItemCount = 100, PartitionKey = partitionKey });
while (setIterator.HasMoreResults)
{
foreach (var item in await setIterator.ReadNextAsync())
{
var @event = _eventSerializer.Deserialize(item.Type, item.Data);
events.Add(@event);
}
}
if (!events.Any())
return null;
var result = BaseAggregateRoot.Create(events.OrderBy(e => e.AggregateVersion));
return result;
}
Básicamente, consultamos todos los eventos para un agregado determinado, los clasificamos por versión y los reproducimos uno tras otro. Pedazo de pastel.
La próxima vez veremos cómo validar un comando antes de ejecutarlo. ¡Chao!
Traducido por Juan Pablo Vidalit