CQRS Repository/event publisher
I am using CqrsLite for a CQRS-style project. The Save method of the concrete Repository implementation looks like so (with irrelevant lines omitted).
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var i = 0;
foreach (var @event in aggregate.GetUncommittedChanges())
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
_publisher.Publish(@event);
}
aggregate.MarkChangesAsCommitted();
}
What's troubling me is that this method is committing events to be published to subscribers BEFORE the aggregate is told to mark them as committed. Thus, if an event handler that observes a given event chokes then the aggregate will not have committed changes that previous event handlers may have been notified of.
Why would I not move _publisher.Publish(@event) to after aggregate.MarkChangesAsCommitted(), like so. What am I missing?
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var events = aggregate.GetUncommittedChanges();
foreach (var @event in events)
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
}
aggregate.MarkChangesAsCommitted();
_publisher.Publish(events);
}
Both approaches are problematic because there might be an error between Save
and Publish
, no matter in what order the two methods are called. This can lead to unsaved events being published or saved events not being published. The problem of in-memory state corruption (in aggregate objects) exists as well (although that could be handled by simply catching errors produced by event handlers).
One solution to this problem would be to use two-phase commit (available eg, if your event store is SQL Server-based and the publisher is MSMQ-based). However, this has performance, scalability, and operations implications, and it doesn't allow late subscribers (see below).
The better approach is to allow parties interested in events to pull them out of the event store (ideally, combining this with some sort of notification mechanism or long polling to make it more "reactive"). This moves the responsibility of tracking the last received event to the subscriber, allowing
You should find more about this approach when searching for something like "using the event store as a queue", and the video from Greg's answer probably adds a lot to this as well.
A common algorithm is this one:
I'd like to add that I don't consider event stores that ignore the Save
/ Publish
problem production-ready. For alternatives, see Greg Young's Event Store or the (currently more or less unmaintained) NEventStore.
No matter which order of Commit
and Publish
you choose, you will have issues if the second one fails.
There are multiple approaches to resolve this issue. Here are your main options:
Use a messaging infrastructure that uses the same database as your application, so that a single transaction can be used. This solution is viable when a very simple messaging infrastructure suffices, and the team decides to build it themselves.
Use 2 phase commits. This has a performance impact, which may or may not be relevant for your application.
Manually ensure you don't get into inconsistent states. See this answer of mine for more information, I don't think it makes sense to copy the whole answer over.
This is an anti-pattern and you shouldn't be doing it.
Apparently putting a link prevents me from signing in as its a "suspicious request"
https://www.youtube.com/watch?v=GbM1ghLeweU
链接地址: http://www.djcxy.com/p/88816.html上一篇: RouteCollection.php中的NotFoundHttpException行161:在laravel 5中
下一篇: CQRS知识库/事件发布者