CQRS知识库/事件发布者

我正在将CqrsLite用于CQRS风格的项目。 具体Repository实现的Save方法看起来像这样(省略了不相关的行)。

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var i = 0;
        foreach (var @event in aggregate.GetUncommittedChanges())
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
            _publisher.Publish(@event);
        }
        aggregate.MarkChangesAsCommitted();
    }

令我感到困扰的是,这种方法正在将事件提交给订阅者发布,之后才告知它们将其标记为已提交。 因此,如果观察给定事件的事件处理程序阻塞,那么聚集将不会提交先前事件处理程序可能已收到通知的更改。

为什么我不会将_publisher.Publish(@event)移动到aggregate.MarkChangesAsCommitted()之后,就像这样。 我错过了什么?

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var events = aggregate.GetUncommittedChanges();
        foreach (var @event in events)
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
        }
        aggregate.MarkChangesAsCommitted();
        _publisher.Publish(events);
    }

这两种方法都是有问题的,因为在SavePublish之间可能存在错误,无论这两种方法被调用的顺序如何。 这可能会导致发布未保存的事件或未发布已保存的事件。 内存状态腐败(在聚合对象中)的问题也存在(尽管可以通过捕获事件处理程序产生的错误来处理)。

解决此问题的一种方法是使用两阶段提交(例如,如果您的事件存储是基于SQL Server的,而发布者是基于MSMQ的)。 但是,这具有性能,可扩展性和运营方面的影响,并且不允许后期用户(见下文)。

更好的方法是允许对事件感兴趣的各方将它们从事件存储库中取出(理想情况下,将其与某种通知机制或长时间轮询相结合以使其更具“被动性”)。 这将跟踪最后收到的事件的责任移动给订户,允许

  • 晚订阅者(在存储事件后长时间加入)接收旧事件以及新事件,
  • 没有两阶段提交的可靠性。
  • 在搜索诸如“使用事件存储作为队列”之类的东西时,您应该更多地了解这种方法,Greg的答案中的视频可能也会增加很多。

    一个常见的算法是:

  • 事件存储为每个保存的事件分配一个检查点标记(例如,一个序列号);
  • 订阅者从事先知道的最后一个检查点令牌(如果有的话)开始,向事件存储库询问新事件(定期地,基于长轮询,对推送通知作出反应等)
  • 事件存储从该检查点标记开始发送更新的事件以及新的检查点标记,
  • 订阅者处理这些事件,并且如果可能的话,以新的检查点标记原子地存储它们产生的任何副作用;
  • 如果原子保存不可行,他们可以在产生副作用后存储新的检查点标记,并且他们需要一种方法来忽略它们已经看到的事件,以防中间存在错误(事件处理被称为“幂等“);
  • 用户从#2开始。
  • 我想补充一点,我不认为忽略Save / Publish问题生产就绪的事件存储。 有关替代方法,请参阅Greg Young的活动商店或(目前或多或少没有维护的)NEventStore。


    无论您选择哪种CommitPublish顺序,如果第二个失败,您都会遇到问题。

    有多种方法可以解决此问题。 以下是您的主要选项:

  • 使用与您的应用程序使用相同数据库的消息传递基础结构,以便可以使用单个事务。 当一个非常简单的消息传递基础架构足够时,这个解决方案是可行的,并且团队决定自己构建它。

  • 使用2阶段提交。 这会对性能产生影响,这可能与您的应用程序相关,也可能不相关。

  • 手动确保你不会陷入不一致的状态。 有关更多信息,请参阅我的答案,我认为将整个答案复制一遍是没有意义的。


  • 这是一种反模式,你不应该这样做。

    显然,通过链接阻止我登录,因为它的“可疑请求”

    https://www.youtube.com/watch?v=GbM1ghLeweU

    链接地址: http://www.djcxy.com/p/88815.html

    上一篇: CQRS Repository/event publisher

    下一篇: Is there any way to use MySQL Temp Tables in Go?