在阿卡暂停演员
我在阿卡有一名演员,将处理消息以创建某些实体。 这些实体上的某些字段是基于创建时数据库中其他实体的状态计算的。
我想避免创建一个竞争条件,其中参与者处理比数据库能够坚持实体更快。 这可能会导致数据不一致,如下所示:
Foo
并将其发送给其他参与者以供进一步处理和保存 Foo
。 由于第一个尚未保存,所以新的创建是基于DB的旧内容创建的,因此创建了错误的Foo
。 现在,这种可能性非常小,因为Foo
的创建将被手动触发。 但仍然可以想象,在高负载下双击可能会导致问题。 谁知道明天的Foo
是否会自动创建。
因此,我需要的是某种方式告诉演员等待,并在确认Foo
已被保存后才能恢复其操作。
有没有办法让演员进入闲置状态,并告诉它在一段时间后恢复运作?
基本上,我想使用邮箱作为消息队列,并控制队列的处理速度。
不,你不能暂停演员:演员总是尽可能快地从他们的邮箱中取出信息。 这只留下传入请求被隐藏起来的可能性,以便稍后处理:
class A(db: ActorRef) extends Actor with Stash {
def receive = {
case Request =>
doWork()
db ! Persist
context.setReceiveTimeout(5.seconds)
context.become({
case Request => stash()
case Persisted => context.unbecome(); unstashAll()
case ReceiveTimeout => throw new TimeoutException("not persisted")
}, discardOld = false)
}
}
请注意,邮件传递不保证(或数据库可能关闭),因此推荐使用超时。
根本的问题
这个问题主要出现在actor模型和领域模型之间没有很好对齐的情况下:actor是一致性的单位,但在你的用例中,你的一致性图像需要一个最新的外部实体数据库),以便演员做正确的事情。 如果不了解更多关于用例的信息,我不能推荐解决方案,但请考虑到这一点,尝试改造您的问题。
事实证明,这只需要几行。 这是我提出的解决方案,与pagoda_5b建议一致:
class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
import QueueingActor._
def receive = {
case message =>
context.become({
case Resume =>
unstashAll()
context.unbecome()
case _ => stash()
})
nextActor ! message
}
}
object QueueingActor {
case class Resume()
}
链接地址: http://www.djcxy.com/p/21507.html