Event Sourcing en Azure: parte 4: Eventos de integración

David Guida
Computación en la nube
Una vez que hayamos actualizado o creado un agregado, ¿cómo hacemos que sus datos estén disponibles para el mundo exterior?

Event Sourcing en Azure: parte 4: Eventos de integración

¡Hola a todos! Bienvenido de nuevo a la cuarta parte de la serie “Event Sourcing” en Azure. Hoy veremos cómo podemos enviar eventos de integración cada vez que algo cambia.

La última vez vimos cómo validar los comandos y asegurarse de que nuestros agrega dos reciben los datos correctos. Por supuesto, esta validación no nos salva de todos los  inconvenientes, pero es la primera línea necesaria.

Las invariantes siempre deben ser “true” para evitar dejar “Aggregates” en un estado no válido.

Ahora, una vez que hemos actualizado o creado un Agregado, ¿cómo hacemos que sus datos estén disponibles para el mundo exterior?

No vamos a exponer que es la representación interna, que los datos están destinados a ser privados. Lo que queremos en su lugar, es crear modelos de consulta específicos que serán expuestos por los diversos extremos GET de nuestro microservicio (o lo que será nuestro protocolo de transporte en los límites).

Queremos desacoplar tanto como sea posible las escrituras de las lecturas y, al mismo tiempo, evitar cualquier posible complicación al conservar los eventos de dominio para cada agregado.

Por lo tanto, aquí está nuestra lista de compras cuando procesamos un nuevo comando:

  1. validación
  2. almacenar los eventos de dominio
  3. para cada evento de dominio enviar un evento de integración

Ahora echemos un vistazo al controlador de comandos CreateCustomer:  

public class CreateCustomerHandler : INotificationHandler

{

      private readonly IEventsService _eventsService;

      private readonly ICustomerEmailsService _customerEmailsRepository;

 

      public async Task Handle(CreateCustomer command, CancellationToken cancellationToken)

      {

            if (await _customerEmailsRepository.ExistsAsync(command.Email)){

                  var error = new ValidationError(nameof(CreateCustomer.Email), $"email '{command.Email}' already exists");

                  throw new ValidationException("Unable to create Customer", error);

            }

 

            var customer = new Customer(command.Id, command.FirstName, command.LastName, command.Email);

            await _eventsService.PersistAsync(customer);

            await _customerEmailsRepository.CreateAsync(command.Email, command.Id);

      }

}

 

 

La clave  aquí  radica en esa  llamada  al   Servicio de   Eventos:  se  encargará   de anexar  el  Evento de Dominio  a la secuencia de Eventos del  Agregado y  publicar un Evento de Integración.   El  código  es en realidad  bastante  simple:

 

public class EventsService : IEventsService where TA : class, IAggregateRoot

{

    public async Task PersistAsync(TA aggregateRoot)

    {

        if (!aggregateRoot.Events.Any())

          return;

 

        await _eventsRepository.AppendAsync(aggregateRoot);

        await _eventProducer.DispatchAsync(aggregateRoot);

       

        aggregateRoot.ClearEvents();

    }

}

 

Como puede ver, al final también nos encargamos de borrar los eventos del Agregado. No queremos que las cosas se procesen más de lo necesario, ¿no es así? Eso, por supuesto, no es suficiente, ya que podríamos incurrir en problemas desagradables como mensajes que se envían / reciben varias veces y así sucesivamente. Pero al menos es un comienzo.

Ahora hablemos sobre la distribución de los eventos de dominio. Cuando los transformamos  en eventos de integración, necesitamos asegurarnos de que cada uno contenga la información mínima para que los suscriptores hagan lo que tengan que hacer.

Esto significa colocar al menos el identificador agregado y el tipo de evento. Dado que somos buenas personas, también usaremos el identificador agregado como identificador de correlación. Nuestro yo futuro nos lo agradecerá más tarde al fregar innumerables entradas de registro.

Cuando hayamos terminado con la asignación, enviaremos de forma masiva estos mensajes a un tema de Service Bus. Cuando hayamos terminado con la asignación, enviaremos de forma masiva estos mensajes a un Service Bus ; De esta manera, cualquier persona interesada puede suscribirse solo a los temas que necesita.

public class EventProducer : IEventProducer where TA : IAggregateRoot

{

    public async Task DispatchAsync(TA aggregateRoot)

    {

        var messages = aggregateRoot.Events.Select(@event =>{

          var eventType = @event.GetType();

            var serialized = _eventSerializer.Serialize(@event);

 

           var message = new Message(serialized){

                  CorrelationId = aggregateRoot.Id.ToString(),

                  UserProperties =

                  {

                        {"aggregate", aggregateRoot.Id.ToString()},

                        {"type", eventType.AssemblyQualifiedName}

                  }

            };

            return message;

      }).ToList();

 

      await _topicClient.SendAsync(messages);

    }

}

 

 

Eso es todo por hoy. La próxima vez veremos cómo suscribirnos a esos eventos y reaccionar ante ellos.

¡Ciao!

Traducido por Rolando Lopez

 

David Guida

Passionate, hard-working, and innovative software engineer with a strong desire for continuous learning and improving.
Over 15 years of experience in different domains, including Healthcare, Finance, and large-scale e-Commerce.

Microsoft MVP for Developer Technologies, confident on back-end and front-end tech stacks including .NET, NodeJs, PHP, Angular, React, as well as extensive experience on SQL, MongoDB, Azure, and GCP.

Everything is "just a tool". The secret is to know which one best suits your needs.

https://www.davidguida.net/

Related Posts

Únete a nuestra Newsletter

Lidera la Conversación en la Nube