-
Notifications
You must be signed in to change notification settings - Fork 79
A Beginner's Guide To Implementing CQRS ES Part 2: Handling Commands and Events
This is Part 2 of a four-part series describing how to build an application in .NET using the Command-Query Responsibility Segregation and Event Sourcing patterns, as well as the [CQRS.NET]. Click here for Part 1.
Now that we've got our Aggregate Roots and Entities defined, it's time to start thinking about what kinds of functionality we want our application to handle. We can begin define this functionality by establishing what Commands the end users can send to our system.
Commands are requests sent by the end user to the application, instructing the app to do something. When our application receives a command, a Command Handler will process it to figure out what the end-user wanted the app to do, and then raise the appropriate Events to accomplish that task. All Commands will need to make some modification to the state of the data in our app (any command which unintentionally makes no change to the state of the data is an improper command, and any command which intentionally makes no change to the state of the data should be a query). Even an event indicating the email address for a new user is already in use by another user is a state change, in thinking this way you can start to see how a beneficial side-effect is that our application is self auditing and self logging. In other words: commands are instructions from the end-user about what modifications to make to the data stored in our app.
So we have aggregate roots for Movies and Movie Reviews. Let's list out the kinds of actions a user could take against those objects. A user could:
- Add a movie
- Edit an existing movie
- Delete a movie
- Add a review
- Edit an existing review
- Delete a review
Remember that "Get a Movie" or a "Get a Review" are not commands, they are queries and will be handled separately. For now, let's create a command for adding a movie to the database.
The command object to start off with is a skeleton of the class looks like this:
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Cqrs.Commands;
using Cqrs.Domain;
[Serializable]
[DataContract]
public class CreateMovieCommand : ICommandWithIdentity<string>
{
#region Implementation of ICommand
/// <summary>
/// The Id of the <see cref="ICommand{TAuthenticationToken}" /> itself, not the object being create. This helps identify one command from another if two are sent with the same information.
/// </summary>
[DataMember]
public Guid Id { get; set; }
/// <summary>
/// The version number you expect this command to shift the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> to. I.E. +1 from what you know it's current version to be.
/// </summary>
[DataMember]
public int ExpectedVersion { get; set; }
#endregion
#region Implementation of IMessageWithAuthenticationToken<string>
/// <summary>
/// The authentication token used to identify the requester.
/// </summary>
[DataMember]
public string AuthenticationToken { get; set; }
#endregion
#region Implementation of IMessage
/// <summary>
/// The correlation id used to group together events and notifications.
/// </summary>
[DataMember]
public Guid CorrelationId { get; set; }
/// <summary>
/// The originating framework this message was sent from.
/// </summary>
public string OriginatingFramework { get; set; }
/// <summary>
/// The frameworks this <see cref="T:Cqrs.Messages.IMessage"/> has been delivered to/sent via already.
/// </summary>
public IEnumerable<string> Frameworks { get; set; }
#endregion
#region Implementation of ICommandWithIdentity
/// <summary>
/// The Rsn of the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> being created.
/// </summary>
[DataMember]
public Guid Rsn { get; set; }
#endregion
}
Notice that the command needs to implement ICommand which is defined by CQRS.NET. There are some require properties that CQRS.NET will use when working with remote networks such as Azure or AWS for you such as authentication information.
The remaining properties of this command will be the data submitted to the application by the end user. In the case of a "create" command, the properties are all those required to display the information back to the user. Our command now looks like this:
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Cqrs.Commands;
using Cqrs.Domain;
[Serializable]
[DataContract]
public class CreateMovieCommand : ICommandWithIdentity<string>
{
#region Implementation of ICommand
/// <summary>
/// The Id of the <see cref="ICommand{TAuthenticationToken}" /> itself, not the object being create. This helps identify one command from another if two are sent with the same information.
/// </summary>
[DataMember]
public Guid Id { get; set; }
/// <summary>
/// The version number you expect this command to shift the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> to. I.E. +1 from what you know it's current version to be.
/// </summary>
[DataMember]
public int ExpectedVersion { get; set; }
#endregion
#region Implementation of IMessageWithAuthenticationToken<string>
/// <summary>
/// The authentication token used to identify the requester.
/// </summary>
[DataMember]
public string AuthenticationToken { get; set; }
#endregion
#region Implementation of IMessage
/// <summary>
/// The correlation id used to group together events and notifications.
/// </summary>
[DataMember]
public Guid CorrelationId { get; set; }
/// <summary>
/// The originating framework this message was sent from.
/// </summary>
public string OriginatingFramework { get; set; }
/// <summary>
/// The frameworks this <see cref="T:Cqrs.Messages.IMessage"/> has been delivered to/sent via already.
/// </summary>
public IEnumerable<string> Frameworks { get; set; }
#endregion
#region Implementation of ICommandWithIdentity
/// <summary>
/// The Rsn of the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> being created.
/// </summary>
[DataMember]
public Guid Rsn { get; set; }
#endregion
[DataMember]
public string Title { get; set; }
[DataMember]
public DateTime ReleaseDate { get; set; }
[DataMember]
public int RunningTimeMinutes { get; set; }
public CreateMovieCommand(string title, DateTime releaseDate, int runningTime, Guid? rsn = null)
{
Id = Guid.NewGuid();
// This allows the user to specify the key, required when working with existing systems or external data sources.
Rsn = rsn == null ? Guid.NewGuid() : rsn.Value;
Title = title;
ReleaseDate = releaseDate;
RunningTimeMinutes = runningTime;
}
}
Notice that the command has no property for a collection of MovieReview objects. This is because we are treating the creation of a MovieReview as a distinctly separate Command, which will be processed separately.
Now that we have the command we will be using, let's create a corresponding Command Handler class that will process the command. The skeleton for our command handler will look like this:
using Cqrs.Commands;
using Cqrs.Domain;
public class CreateMovieCommandHandler : ICommandHandler<string, CreateMovieCommand>
{
protected IUnitOfWork<string> UnitOfWork { get; private set; }
public CreateMovieCommandHandler(IUnitOfWork<string> unitOfWork)
{
UnitOfWork = unitOfWork;
}
}
A Command Handler must implement from ICommandHandler<TAuthenticationToken, TCommand>, where TCommand is the type of the command being handled.
The IUnitOfWork is an interface defined by CQRS.NET that handles the transactions and interactions between your code and the event store itself. We won't actually need to implement that repository, we just need to set it up, and we'll do this in a later step.
The next step in setting up this command handler is to implement a Handle method for the command being handled. We do this like so:
using System;
using Cqrs.Commands;
using Cqrs.Domain;
public class CreateMovieCommandHandler : ICommandHandler<string, CreateMovieCommand>
{
protected IUnitOfWork<string> UnitOfWork { get; private set; }
public CreateMovieCommandHandler(IUnitOfWork<string> unitOfWork)
{
UnitOfWork = unitOfWork;
}
#region Implementation of ICommandHandler<in CreateMovieCommand>
public void Handle(CreateMovieCommand command)
{
Movie item = new Movie(command.Rsn == Guid.Empty ? Guid.NewGuid() : command.Rsn);
UnitOfWork.Add(item);
UnitOfWork.Commit();
}
#endregion
}
The real work comes in the UnitOfWork.Commit() method call, which takes all the pending events in the aggregate root and commits them to the event store before publishing them on the event bus.
However We haven't yet created any events and as mentioned previously all commands should raise an event.
Events are changes made to the state of the system that result from the end-user issuing Commands. They are serialised and stored in the Event Store, so that they can be recalled and re-executed at a later time (remember that this is a primary benefit of using Event Sourcing). Any number of Events may be raised from a single Command. In more concrete terms, events are classes.
We've already created a command for the creation of a Movie, so let's create an event for that command:
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Cqrs.Domain;
using Cqrs.Events;
using Cqrs.Messages;
public class MovieCreatedEvent : IEventWithIdentity<string>
{
#region Implementation of IEvent
/// <summary>
/// The Id of the <see cref="IEvent{TAuthenticationToken}" /> itself, not the object that was created. This helps identify one event from another if two are sent with the same information.
/// </summary>
[DataMember]
public Guid Id { get; set; }
/// <summary>
/// The version number the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> shifted to as a result of the request.
/// </summary>
[DataMember]
public int Version { get; set; }
/// <summary>
/// The time the event was generated. Application of the event may happen at a different time.
/// </summary>
[DataMember]
public DateTimeOffset TimeStamp { get; set; }
#endregion
#region Implementation of IMessageWithAuthenticationToken<Guid>
/// <summary>
/// The authentication token used to identify the requester.
/// </summary>
[DataMember]
public string AuthenticationToken { get; set; }
#endregion
#region Implementation of IMessage
/// <summary>
/// The correlation id used to group together events and notifications.
/// </summary>
[DataMember]
public Guid CorrelationId { get; set; }
/// <summary>
/// The originating framework this message was sent from.
/// </summary>
[DataMember]
public string OriginatingFramework { get; set; }
/// <summary>
/// The frameworks this <see cref="IMessage"/> has been delivered to/sent via already.
/// </summary>
[DataMember]
public IEnumerable<string> Frameworks { get; set; }
#endregion
#region Implementation of IEventWithIdentity
/// <summary>
/// The Rsn of the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> create.
/// </summary>
[DataMember]
public Guid Rsn { get; set; }
#endregion
}
Notice the inheritance from IEventWithIdentity; this is so our events conform to the basic requirements of the Event Store, and when working with remote networks such as Azure or AWS for you such as authentication information.
The rest of the event is defined like so:
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Cqrs.Domain;
using Cqrs.Events;
using Cqrs.Messages;
public class MovieCreatedEvent : IEventWithIdentity<string>
{
#region Implementation of IEvent
/// <summary>
/// The Id of the <see cref="IEvent{TAuthenticationToken}" /> itself, not the object that was created. This helps identify one event from another if two are sent with the same information.
/// </summary>
[DataMember]
public Guid Id { get; set; }
/// <summary>
/// The version number the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> shifted to as a result of the request.
/// </summary>
[DataMember]
public int Version { get; set; }
/// <summary>
/// The time the event was generated. Application of the event may happen at a different time.
/// </summary>
[DataMember]
public DateTimeOffset TimeStamp { get; set; }
#endregion
#region Implementation of IMessageWithAuthenticationToken<Guid>
/// <summary>
/// The authentication token used to identify the requester.
/// </summary>
[DataMember]
public string AuthenticationToken { get; set; }
#endregion
#region Implementation of IMessage
/// <summary>
/// The correlation id used to group together events and notifications.
/// </summary>
[DataMember]
public Guid CorrelationId { get; set; }
/// <summary>
/// The originating framework this message was sent from.
/// </summary>
[DataMember]
public string OriginatingFramework { get; set; }
/// <summary>
/// The frameworks this <see cref="IMessage"/> has been delivered to/sent via already.
/// </summary>
[DataMember]
public IEnumerable<string> Frameworks { get; set; }
#endregion
#region Implementation of IEventWithIdentity
/// <summary>
/// The Rsn of the <see cref="AggregateRoot{TAuthenticationToken}">aggregate root</see> create.
/// </summary>
[DataMember]
public Guid Rsn { get; set; }
#endregion
[DataMember]
public string Title { get; set; }
[DataMember]
public DateTime ReleaseDate { get; set; }
[DataMember]
public int RunningTimeMinutes { get; set; }
public MovieCreatedEvent(Guid rsn, string title, DateTime releaseDate, int runningTime)
{
Rsn = rsn;
Title = title;
ReleaseDate = releaseDate;
RunningTimeMinutes = runningTime;
}
}
Don't forget that this class represents the object that will be serialised and stored in the event store, and as such it should have all the information stored within it that is needed to populate the entities that can be changed as part of this event.
Now that we have this event, we need to update the AggregateRoot we created back in Part 1:
public class Movie : AggregateRoot<string>
{
/// <summary>
/// The identifier of this movie.
/// </summary>
public Guid Rsn
{
get { return Id; }
private set { Id = value; }
}
public Movie(Guid rsn)
{
Rsn = rsn;
}
public void Create(string title, DateTime releaseDate, int runningTimeMinutes)
{
MovieCreatedEvent movieCreatedEvent = new MovieCreatedEvent(Rsn, title, releaseDate, runningTimeMinutes);
ApplyChange(movieCreatedEvent);
}
}
The ApplyChange() method stores an Event in the AggregateRoot's local collection of pending changes, so that it can later be persisted to the Event Store properly. This makes sense: any time we create a Movie, we need to record an event stating that the Movie was created with these values. All this method does is implement that idea. Again, since there are currently no business rules to enforce (invariants) or business processes to execute no state needs to be held in memory via properties.
Now we update the command handler to call this method
#region Implementation of ICommandHandler<in CreateMovieCommand>
public void Handle(CreateMovieCommand command)
{
Movie item = new Movie(command.Rsn == Guid.Empty ? Guid.NewGuid() : command.Rsn);
UnitOfWork.Add(item);
item.Create(command.Title, command.ReleaseDate, command.RunningTimeMinutes);
UnitOfWork.Commit();
}
#endregion
There's one more step we need: implementing an Event Handler that deserialises the event and adds the newly created Movie to the datastore.
For the Event Handler, we need a class that implements IEventHandler<TAuthenticationToken, TEvent>:
using System;
using Cqrs.Events;
public class UpdateMovieEntityEventHandler : IEventHandler<string, MovieCreatedEvent>
{
public void Handle(MovieCreatedEvent createdEvent)
{
throw new NotImplementedException();
}
}
A single Event Handler can implement many IEventHandlers. For now, let's create a simple implementation to save the newly create Movie into the datastore.
using System.Configuration;
using System.Data.Linq;
using System.Transactions;
using Cqrs.Events;
public class UpdateMovieEntityEventHandler : IEventHandler<string, MovieCreatedEvent>
{
public void Handle(MovieCreatedEvent createdEvent)
{
using (var transaction = new TransactionScope())
{
using (DataContext dbDataContext = new DataContext(ConfigurationManager.ConnectionStrings["MoviesDataStore"].ConnectionString))
{
Table<MovieEntity> table = dbDataContext.GetTable<MovieEntity>();
var entity = new MovieEntity
{
Rsn = createdEvent.Rsn,
Title = createdEvent.Title,
ReleaseDate = createdEvent.ReleaseDate,
RunningTimeMinutes = createdEvent.RunningTimeMinutes
};
table.InsertOnSubmit(entity);
dbDataContext.SubmitChanges();
}
transaction.Complete();
}
}
}
For the sake of this sample we're using regular Entity Framework code (Remember to add the nuget package). The issue with this is that it makes future changes rather complicated, and CQRS.NET has some advanced features for handling working with databases, but for this simple sample this won't cause us any issues.
When creating commands and command handlers:
- The command object must implement ICommand.
- The command handler class must inherit from ICommandHandler<>.
- The command handler must implement a
Handle()
method for each command it handles.
When creating events and event handlers:
- The event object must inherit from IEvent.
- The event handler must implement IEventHandler<>, and can implement as many as necessary, with each one having a
Handle()
method implementation. - The aggregate roots don't need to hold all their property values in memory if they aren't needed.
Now we've created a CQRS/ES backend that we can start using! But how does an end-user call it? We'll discuss that in the next part of this series, which covers creating the end-user interface in the form of a console application and the order of execution for this system.