Transaction support in MongoDB
First things first; a transaction is a unit of work that should be treated as "a whole." It has to either happen in full or not at all. Transaction support in relational databases likes MSSQL or Oracle Databases have been around for a while. The case for NoSql databases though, is not the exactly same, well until now. As the first NoSql database, MongoDB (and you may correct me if I'm wrong) has as of version 4.0 implemented support for multi-document ACID transactions. Just to make clear, MongoDB has always supported ACID transactions in a single document, and you may also read about early days of DynamoDB and its introduction to transactions with some limitations. For most of the cases and for big number of applications single document ACID has proven to be sufficient. For those cases that needed some kind of transaction control, it was implemented manually.
Usage of transactions are in many cases related to the finance domain and are often seen in examples with money being transferred between accounts. If withdrawing one account fails, then other operations involving other accounts should be aborted as well. For the money transfer one can argue that by putting everything in one document, the case can easily be solved without transactions. So, when do we need transactions?
Well, let’s have a look into CQRS and EventSourcing sphere. If you are not familiar with CQRS or EventSourcing you can have a look at Martin Fowlers official definition. If you find CQRS interesting you also may want to look into this collection of different resources related to the subject. Practicing CQRS usually means that commands and queries are treated separately, which in order implies different databases; one for persisting the outcome of commands(events), and one for persisting an optimized read model (current snapshot). Reasons for having two different storages could be optimizations, separation of concerns, etc.
With great choice comes also great responsibilities, and one of them is consistency. If no attention is paid to this subject, one may end up having stale data in the read model, also referred to eventual consistency. As a side note; many NoSQL databases embraced an eventual consistency approach, while MongoDB supported a strong consistency model for single document.
If dealing with eventual consistency is not an option, one must make sure that any operation made on the event storage happens in sync with operation made on the read storage. In other words, the operations must occur in a single transaction.
The setup
For this scenario I'm going to use CQRSLite, which is a small library that helps creating CQRS and Eventsourcing applications in C#. I'll implement a MongoEventStore that handles events and updates the readmodel in one transaction. The business case for the purpose is a simple user registration api, which supports user registration and activation of the user account.
First of all, I need to setup the mongo replica set, because transasctions can only run replica set. Trying to run transactions on a standalone mongod instance will throw the following exception: MongoError: Transaction numbers are only allowed on a replica set member or mongos. As I already have monodb installed on my computer, I'll just add a replica set to it. For my setup I'm using mongodb v.4.0.9. To avoid tampering with some of the default mongo port that is 27017, I just choose another ranmdom ports
mongod --replSet rs0 --dbpath c:\data\rs1 --port 37017
mongod --replSet rs0 --dbpath c:\data\rs2 --port 37018
mongod --replSet rs0 --dbpath c:\data\rs3 --port 37019
Then I log into the mongod process on port 37017, which will be the primary, and configure the replica set. More about replica set configuration and usage here here:
mongo --port 37017
config = {
_id: "rs0",
members: [
{_id: 0, host: "localhost:37017"},
{_id: 1, host: "localhost:37018"},
{_id: 2, host: "localhost:37019"}
]
}
rs.initiate(config)
If you instead want to use docker for running the replica set there are plenty of tutorials on how to do that.
When working with one mongod instance, creation of the database and the collections can be done on runtime. On the other hand, working with transactions this is not allowed and the following exception will be thrown. So, make sure this is done in a script or similar up front. In a production environment this should even be part of a CI/CD process.
.
For now, I'll just run the following command to set up my database and collections:
use usersystem
db.createCollection("events")
db.createCollection("userreadmodel")
To be able to use transactions I need an IClientSessionHandle object, and this is the object I need to use for the collections I want to make sure are part of the transaction. To achieve that I create my own interface; IMongoContext holding the necessary interfaces.
public interface IMongoDbContext
{
IMongoDatabase Database { get; }
IClientSessionHandle Session { get; }
Task<IClientSessionHandle> StartSession(CancellationToken cancellactionToken = default);
}
Then I register IMongoContext as singleton and can make use of the same session accross different handler classes that are part of the flow.
public static IServiceCollection RegisterServices(this IServiceCollection services)
{
services.AddSingleton<IMongoDbContext>(x => new MongoDbContext());
.....
}
In the MongoDbEventStore I make sure I have injected IMongoDbContext so I can start a session and a transaction. For the transactionoptions I use ReadConcern.Snapshot, ReadPreference.Primary and WriteConcern.WMarjority. More about read concerns here and write concerns here
public MongoDbEventStore(IEventPublisher publisher, IMongoDbContext mongoDbContext)
{
_publisher = publisher;
_mongoDbContext = mongoDbContext;
_events = _mongoDbContext.Database.GetCollection<EventData>(EventsCollection);
}
public async Task Save(IEnumerable<IEvent> events, CancellationToken cancellationToken = default)
{
using (var session = await _mongoDbContext.StartSession(cancellationToken))
{
var options = new TransactionOptions(ReadConcern.Snapshot, ReadPreference.Primary, WriteConcern.WMajority);
session.StartTransaction(options);
try
{
var bulkOps = new List<WriteModel<EventData>>();
foreach (var @event in events)
{
var eventData = new EventData
{
Id = Guid.NewGuid(),
StreamId = @event.Id,
TimeStamp = @event.TimeStamp,
AssemblyQualifiedName = @event.GetType().AssemblyQualifiedName,
PayLoad = @event
};
bulkOps.Add(new InsertOneModel<EventData>(eventData));
await _publisher.Publish(@event, cancellationToken);
}
await _events.BulkWriteAsync(session, bulkOps).ConfigureAwait(false);
await session.CommitTransactionAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception exp)
{
session.AbortTransaction(cancellationToken);
throw exp;
}
}
}
By publishing events from MongoDbEventStore, we can have any numbers of listeners to grab the events and handle them the best way needed. One of the handlers that we need in order to update the readmodel, is a UserReadModelEventHandler. This class handles only the readmodel and by injecting IMongoContext to its ctor, we are able to get the session that is needed when invoking the InsertOneAsync() to make sure both collections are updated within the same transaction. The transaction is completed when writing to events collection and readmodel collection has succeeded.
By looking back into the user registration api, the flow should be as follow:
- User is registered by posting a request to "/user" endpoint. The response is the user id
- By using the user id and doing a put request to "user/{userid}/activate" the user is activated
If we then take a look into the database, using MongoDb Compass, we'll find two events, one for registrating the user and one for activating the account. By looking at the version property we can see that they have occurred in proper order.
Also looking into the userreadmodel we can see that there is one document with the current state of the user.
The entire solution is available on my github
Considerations
For this demo I have used a naive approach to implement a mongodbeventstore. By this, I mean that one event occurred on the aggregate results in one document in the eventstore. With many events on the aggregate this could potentially slow down the loading of the aggregate. Another approach to consider when there potentially could be many events per commit, is to store the events as a collection within the same commit. This way you will end up having less documents for the same aggregate, which in turn will speed up the event replay time.
Be aware of
- Database and collections must be created upfront, otherwise an error message is thrown.
- Transactions are not supported with shards in version 4.0. Support is scheduled for version 4.2
- Transactuins cannot run on standalone instances of mondodb.
- There is a an additional cost in using transactions, so carefully carry out your transactions
- As a best practice, no more than 1,000 documents should be modified within a transaction.
Resources used
- https://en.wikipedia.org/wiki/Database_transaction
- https://www.infoq.com/articles/MarkLogic-NoSQL-with-Transactions
- https://martinfowler.com/bliki/CQRS.html
- https://github.com/gautema/CQRSlite
- https://www.mongodb.com/transactions
- https://www.mongodb.com/blog/post/working-with-mongodb-transactions-with-c-and-the-net-framework
- https://blog.yugabyte.com/nosql-databases-becoming-transactional-mongodb-dynamodb-faunadb-cosmosdb/
Comments