CQRS Repository/event publisher

I am using CqrsLite for a CQRS-style project. The Save method of the concrete Repository implementation looks like so (with irrelevant lines omitted).

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var i = 0;
        foreach (var @event in aggregate.GetUncommittedChanges())
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
            _publisher.Publish(@event);
        }
        aggregate.MarkChangesAsCommitted();
    }

What's troubling me is that this method is committing events to be published to subscribers BEFORE the aggregate is told to mark them as committed. Thus, if an event handler that observes a given event chokes then the aggregate will not have committed changes that previous event handlers may have been notified of.

Why would I not move _publisher.Publish(@event) to after aggregate.MarkChangesAsCommitted(), like so. What am I missing?

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var events = aggregate.GetUncommittedChanges();
        foreach (var @event in events)
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
        }
        aggregate.MarkChangesAsCommitted();
        _publisher.Publish(events);
    }

Both approaches are problematic because there might be an error between Save and Publish , no matter in what order the two methods are called. This can lead to unsaved events being published or saved events not being published. The problem of in-memory state corruption (in aggregate objects) exists as well (although that could be handled by simply catching errors produced by event handlers).

One solution to this problem would be to use two-phase commit (available eg, if your event store is SQL Server-based and the publisher is MSMQ-based). However, this has performance, scalability, and operations implications, and it doesn't allow late subscribers (see below).

The better approach is to allow parties interested in events to pull them out of the event store (ideally, combining this with some sort of notification mechanism or long polling to make it more "reactive"). This moves the responsibility of tracking the last received event to the subscriber, allowing

  • late subscribers (joining in long after an event was stored) to receive old events as well as new ones,
  • reliability without two-phase commits.
  • You should find more about this approach when searching for something like "using the event store as a queue", and the video from Greg's answer probably adds a lot to this as well.

    A common algorithm is this one:

  • the event store assigns every saved event a checkpoint token (eg, a sequence number);
  • subscribers ask the event store for new events (periodically, based on long polling, reacting on push notifications, etc.) starting from the last checkpoint token they know (if any),
  • the event store sends newer events starting from that checkpoint token together with a new checkpoint token,
  • the subscribers handle the events and, if possible, store the new checkpoint token atomically with whatever side effects they produced;
  • if atomic saving is not possible, they can store the new checkpoint token after producing their side effects and they need a way to ignore events they've already seen in case there is an error in-between (event handling is then said to be "idempotent");
  • subscribers start again at #2.
  • I'd like to add that I don't consider event stores that ignore the Save / Publish problem production-ready. For alternatives, see Greg Young's Event Store or the (currently more or less unmaintained) NEventStore.


    No matter which order of Commit and Publish you choose, you will have issues if the second one fails.

    There are multiple approaches to resolve this issue. Here are your main options:

  • Use a messaging infrastructure that uses the same database as your application, so that a single transaction can be used. This solution is viable when a very simple messaging infrastructure suffices, and the team decides to build it themselves.

  • Use 2 phase commits. This has a performance impact, which may or may not be relevant for your application.

  • Manually ensure you don't get into inconsistent states. See this answer of mine for more information, I don't think it makes sense to copy the whole answer over.


  • This is an anti-pattern and you shouldn't be doing it.

    Apparently putting a link prevents me from signing in as its a "suspicious request"

    https://www.youtube.com/watch?v=GbM1ghLeweU

    链接地址: http://www.djcxy.com/p/88816.html

    上一篇: RouteCollection.php中的NotFoundHttpException行161:在laravel 5中

    下一篇: CQRS知识库/事件发布者