CQRS知识库/事件发布者
我正在将CqrsLite用于CQRS风格的项目。 具体Repository实现的Save方法看起来像这样(省略了不相关的行)。
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var i = 0;
foreach (var @event in aggregate.GetUncommittedChanges())
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
_publisher.Publish(@event);
}
aggregate.MarkChangesAsCommitted();
}
令我感到困扰的是,这种方法正在将事件提交给订阅者发布,之后才告知它们将其标记为已提交。 因此,如果观察给定事件的事件处理程序阻塞,那么聚集将不会提交先前事件处理程序可能已收到通知的更改。
为什么我不会将_publisher.Publish(@event)移动到aggregate.MarkChangesAsCommitted()之后,就像这样。 我错过了什么?
public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var events = aggregate.GetUncommittedChanges();
foreach (var @event in events)
{
// ... [irrelevant code removed] ...
_eventStore.Save(typeof(T), @event);
}
aggregate.MarkChangesAsCommitted();
_publisher.Publish(events);
}
这两种方法都是有问题的,因为在Save
和Publish
之间可能存在错误,无论这两种方法被调用的顺序如何。 这可能会导致发布未保存的事件或未发布已保存的事件。 内存状态腐败(在聚合对象中)的问题也存在(尽管可以通过捕获事件处理程序产生的错误来处理)。
解决此问题的一种方法是使用两阶段提交(例如,如果您的事件存储是基于SQL Server的,而发布者是基于MSMQ的)。 但是,这具有性能,可扩展性和运营方面的影响,并且不允许后期用户(见下文)。
更好的方法是允许对事件感兴趣的各方将它们从事件存储库中取出(理想情况下,将其与某种通知机制或长时间轮询相结合以使其更具“被动性”)。 这将跟踪最后收到的事件的责任移动给订户,允许
在搜索诸如“使用事件存储作为队列”之类的东西时,您应该更多地了解这种方法,Greg的答案中的视频可能也会增加很多。
一个常见的算法是:
我想补充一点,我不认为忽略Save
/ Publish
问题生产就绪的事件存储。 有关替代方法,请参阅Greg Young的活动商店或(目前或多或少没有维护的)NEventStore。
无论您选择哪种Commit
和Publish
顺序,如果第二个失败,您都会遇到问题。
有多种方法可以解决此问题。 以下是您的主要选项:
使用与您的应用程序使用相同数据库的消息传递基础结构,以便可以使用单个事务。 当一个非常简单的消息传递基础架构足够时,这个解决方案是可行的,并且团队决定自己构建它。
使用2阶段提交。 这会对性能产生影响,这可能与您的应用程序相关,也可能不相关。
手动确保你不会陷入不一致的状态。 有关更多信息,请参阅我的答案,我认为将整个答案复制一遍是没有意义的。
这是一种反模式,你不应该这样做。
显然,通过链接阻止我登录,因为它的“可疑请求”
https://www.youtube.com/watch?v=GbM1ghLeweU
链接地址: http://www.djcxy.com/p/88815.html