演员模型:一名演员需要来自另一名演员的信息

我不明白的演员模型是这样的。 假设你有两个演员。 他们收集和管理各种来源的数据。 这些资源通过收件箱/邮箱/队列与演员互动。 例如,演员A收集信号,而演员B管理整个系统的信息。

在某个时刻,演员A必须处理其数据。 在处理时,它不能做其他事情。 例如,新信号无法处理。 作为数据处理的一部分,在A可以继续之前,演员A需要来自演员B的信息。

在伪代码中:

functionOfActorA()
{
  internalQueue {
   ...doing stuff with our data
   info = actor_B.getInfo() -> what should happen here?
   ...doing stuf with our data and the obtained info
   }
}

getInfo()//function of actor B
{
  internalQueue {
    ...prepare requested info
    ... -> what should happen here?
  }
} 

如果两个演员都应该在他们自己的队列或线程中独立运作,那么如何根据演员的要求从一个演员获取信息?


演员A可以通过向演员B发送请求来请求信息,但是响应将返回到演员A将在未来某个点接收的消息中。 有两种方法可以保存这些信息:

  • 演员A可以在内部存储信息以等待该响应。

  • 演员A可以将信息附加到它发送给演员B的消息上,演员B返回该上下文(否则它会忽略)。

  • 当演员A从演员B获得信息时,它可以恢复处理。

    下面是使用Thespian Python actor库(http://thespianpy.com)的第一种实现方式的示例:

    class ActorA(Actor):
    
        def __init__(self, *args, **kw):
            super(ActorA, self).__init__(args, kw)
            self.datalist = []
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, ActorAddress):
                self.actorB = msg
    
            elif isinstance(msg, WorkRequest):
                x = got_some_data(msg)
                self.datalist.append(x)
                self.send(self.actorB, NeedInfo())
    
            elif isinstance(msg, Info):
                x = self.datalist.pop()
                process_data(x, msg)
    
    class ActorB(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, NeedInfo):
                i = get_info(msg)
                self.send(sender, i)
    

    上面显示了将作品内部存储到演员的基本功能,以便稍后继续。 有几个考虑因素:

  • 如果存在多个要存储的数据项,ActorA需要有某种方式来确定来自ActorB的信息适用于哪个项目。

  • ActorA需要处理它不知道ActorB地址的情况。

  • ActorA应该可能使用某种超时来避免永久保留内部列表上的工作(如果ActorB永远不会响应)。

  • 如果ActorA退出或死亡,它应该在退出之前对仍在datalist上的项目执行适当的操作。

  • 以下是第二种实施方式的相应简单示例:

    class ActorA(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, ActorAddress):
                self.actorB = msg
    
            elif isinstance(msg, WorkRequest):
                x = got_some_data(msg)
                self.send(self.actorB, NeedInfo(x))
    
            elif isinstance(msg, Info):
                x = msg.orig_request.data
                process_data(x, msg)
    
    class ActorB(Actor):
    
        def receiveMessage(self, msg, sender):
    
            if isinstance(msg, NeedInfo):
                i = getinfo(msg)
                i.orig_request = msg
                self.send(sender, i)
    

    在第二个示例中,创建NeedInfo的调用将数据存储在NeedInfo对象的.data字段中,并且由ActorB传回的Info具有附加的原始NeedInfo消息。 这种风格的考虑因素是:

  • 无需将原始WorkRequest消息独立关联到相应的Info,因为它们彼此相连。

  • ActorA更多依赖于ActorB的实现,期望ActorB将原始消息附加到其响应中,以允许ActorA恢复此上下文。

  • ActorA仍然需要处理它不知道ActorB地址的情况。

  • 如果ActorA退出或死亡,它没有这个杰出工作的记录在清除之前采取任何行动(尽管死信处理程序可以执行此角色)。

  • 上面的例子相当简单,其他Actor模型库实现会有一些变化,但是这些通用技术应该广泛适用于任何库/语言。 这两种实现风格都遵循以下一般Actor指导原则:

  • 收到消息后,做一些可行的工作

  • 根据需要更新内部状态以处理下一条消息

  • 从消息处理程序退出

  • 虽然可以编写一个执行阻止操作的Actor,但该操作将干扰Actor的响应能力和处理其他消息的能力(如您正确指出的那样),因此尽可能使用消息驱动的继续而不是阻塞调用。

    链接地址: http://www.djcxy.com/p/21519.html

    上一篇: actor model: one actor requires information from another actor

    下一篇: Understanding Akka Actors' threading