Event Sourcing en Azure: parte 4: Eventos de integración
¡Hola a todos! Bienvenido de nuevo a la cuarta parte de la serie “Event Sourcing” en Azure. Hoy veremos cómo podemos enviar eventos de integración cada vez que algo cambia.
La última vez vimos cómo validar los comandos y asegurarse de que nuestros agrega dos reciben los datos correctos. Por supuesto, esta validación no nos salva de todos los inconvenientes, pero es la primera línea necesaria.
Las invariantes siempre deben ser “true” para evitar dejar “Aggregates” en un estado no válido.
Ahora, una vez que hemos actualizado o creado un Agregado, ¿cómo hacemos que sus datos estén disponibles para el mundo exterior?
No vamos a exponer que es la representación interna, que los datos están destinados a ser privados. Lo que queremos en su lugar, es crear modelos de consulta específicos que serán expuestos por los diversos extremos GET de nuestro microservicio (o lo que será nuestro protocolo de transporte en los límites).
Queremos desacoplar tanto como sea posible las escrituras de las lecturas y, al mismo tiempo, evitar cualquier posible complicación al conservar los eventos de dominio para cada agregado.
Por lo tanto, aquí está nuestra lista de compras cuando procesamos un nuevo comando:
- validación
- almacenar los eventos de dominio
- para cada evento de dominio enviar un evento de integración
Ahora echemos un vistazo al controlador de comandos CreateCustomer:
public class CreateCustomerHandler : INotificationHandler
{
private readonly IEventsService _eventsService;
private readonly ICustomerEmailsService _customerEmailsRepository;
public async Task Handle(CreateCustomer command, CancellationToken cancellationToken)
{
if (await _customerEmailsRepository.ExistsAsync(command.Email)){
var error = new ValidationError(nameof(CreateCustomer.Email), $"email '{command.Email}' already exists");
throw new ValidationException("Unable to create Customer", error);
}
var customer = new Customer(command.Id, command.FirstName, command.LastName, command.Email);
await _eventsService.PersistAsync(customer);
await _customerEmailsRepository.CreateAsync(command.Email, command.Id);
}
}
La clave aquí radica en esa llamada al Servicio de Eventos: se encargará de anexar el Evento de Dominio a la secuencia de Eventos del Agregado y publicar un Evento de Integración. El código es en realidad bastante simple:
public class EventsService : IEventsService where TA : class, IAggregateRoot
{
public async Task PersistAsync(TA aggregateRoot)
{
if (!aggregateRoot.Events.Any())
return;
await _eventsRepository.AppendAsync(aggregateRoot);
await _eventProducer.DispatchAsync(aggregateRoot);
aggregateRoot.ClearEvents();
}
}
Como puede ver, al final también nos encargamos de borrar los eventos del Agregado. No queremos que las cosas se procesen más de lo necesario, ¿no es así? Eso, por supuesto, no es suficiente, ya que podríamos incurrir en problemas desagradables como mensajes que se envían / reciben varias veces y así sucesivamente. Pero al menos es un comienzo.
Ahora hablemos sobre la distribución de los eventos de dominio. Cuando los transformamos en eventos de integración, necesitamos asegurarnos de que cada uno contenga la información mínima para que los suscriptores hagan lo que tengan que hacer.
Esto significa colocar al menos el identificador agregado y el tipo de evento. Dado que somos buenas personas, también usaremos el identificador agregado como identificador de correlación. Nuestro yo futuro nos lo agradecerá más tarde al fregar innumerables entradas de registro.
Cuando hayamos terminado con la asignación, enviaremos de forma masiva estos mensajes a un tema de Service Bus. Cuando hayamos terminado con la asignación, enviaremos de forma masiva estos mensajes a un Service Bus ; De esta manera, cualquier persona interesada puede suscribirse solo a los temas que necesita.
public class EventProducer : IEventProducer where TA : IAggregateRoot
{
public async Task DispatchAsync(TA aggregateRoot)
{
var messages = aggregateRoot.Events.Select(@event =>{
var eventType = @event.GetType();
var serialized = _eventSerializer.Serialize(@event);
var message = new Message(serialized){
CorrelationId = aggregateRoot.Id.ToString(),
UserProperties =
{
{"aggregate", aggregateRoot.Id.ToString()},
{"type", eventType.AssemblyQualifiedName}
}
};
return message;
}).ToList();
await _topicClient.SendAsync(messages);
}
}
Eso es todo por hoy. La próxima vez veremos cómo suscribirnos a esos eventos y reaccionar ante ellos.
¡Ciao!
Traducido por Rolando Lopez