akka演员发邮件给MailBox的头像
生产者演员是否可以将消息发送给其他演员以进行即时处理? 即将消息发布到消费者邮箱的头部而不是消费者邮箱的尾部?
我知道akka提供了一种配置我自己定义的邮箱类型的方法,但是如何控制是否需要将某些类型的邮件发布到MailBox的头部而不是尾部。 例如TimerMessages
。 我想要一个精确的计时器控制时间窗口实现。 消息必须保持1000毫秒(比方说),如果消息处理消耗时间,并且在邮箱中有很多待处理的消息,我不希望计时器消息被追加到同一个队列中。
我可以用一个PriorityMailBox
,但麻烦PriorityMailBox
是,即使它可以在邮箱的负责人,对于相同优先级的消息提出了更高优先级的消息(计时器消息),消息的邮箱的顺序是不能保证是相同的作为到达顺序。 所以我也不能使用priorityMailBox。
有人可以告诉我如何实现这种行为?
您可以使用自己的PriorityMailBox
来处理消息的到达时间,并将其用作附加优先级(对于具有相同“主”优先级的消息)。
像这样(未测试):
import akka.dispatch._
import com.typesafe.config.Config
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import java.util.Comparator
import java.util.concurrent.PriorityBlockingQueue
class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedTimedPriorityMailbox(
TimedPriorityGenerator {
case 'highpriority ⇒ 0
case 'lowpriority ⇒ 2
case PoisonPill ⇒ 3
case otherwise ⇒ 1
})
case class TimedEnvelope(envelope: Envelope) {
private val _timestamp = System.nanoTime()
def timestamp = _timestamp
}
class UnboundedTimedPriorityMailbox( final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics {
override def queue: java.util.Queue[TimedEnvelope] = this
}
}
trait TimedQueueBasedMessageQueue extends MessageQueue {
def queue: java.util.Queue[TimedEnvelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
if (hasMessages) {
var envelope = dequeue()
while (envelope ne null) {
deadLetters.enqueue(owner, envelope)
envelope = dequeue()
}
}
}
}
trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) }
def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null)
}
object TimedPriorityGenerator {
def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator {
def gen(message: Any): Int = priorityFunction(message)
}
}
abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] {
def gen(message: Any): Int
final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = {
val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message)
// Int.MaxValue / Int.MinValue check omitted
if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result
}
}
上面的代码工作正常。
只有一个细节。 避免使用System.getTimeNano()。 它在多核机器中存在问题,因为它是由per-cpu逻辑定义的
这里另一篇文章
然后,我们在消息顺序Dependending中有一个奇怪的行为。
我用经典的System.currentTimeMillis()改变它。 它不太精确,但在我们的情况下,如果具有相同优先级和相同毫秒生成时间的两条消息,则不关心它们被处理的顺序。
感谢代码!
链接地址: http://www.djcxy.com/p/21509.html