akka actor post a message to head of the MailBox
Can a Producer actor post a message to another actor for immediate processing? ie post a message to the head of the Consumer MailBox instead of the tail of the Consumer MailBox?
I know that akka provides a way to configure my own defined Mailbox type, but how to control if some type of the messages need to be posted at the head of the MailBox instead of tail. eg TimerMessages
. i want a precise timer control for a time window implementation. messages must be kept for 1000 msec only (say), and if the message processing consumes time and there are many messages pending in the mailBox, i dont want timer Message to be appended to the same queue.
I could use a PriorityMailBox
, but the trouble with PriorityMailBox
is that even though it can put higher priority messages (timer messages) at the head of MailBox, for Messages of same priority, the order of messages in the MailBox is not guaranteed to be same as order of arrival. So i cannot use the priorityMailBox also.
Can someone please tell me how i can achieve this behavior?
You can use your own PriorityMailBox
which can take care of message's arrival time and use it as an additional priority (for messages with the same "main" priority).
Something like this (not tested):
import akka.dispatch._
import com.typesafe.config.Config
import akka.actor.{ActorRef, PoisonPill, ActorSystem}
import java.util.Comparator
import java.util.concurrent.PriorityBlockingQueue
class MyTimedPriorityMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedTimedPriorityMailbox(
TimedPriorityGenerator {
case 'highpriority ⇒ 0
case 'lowpriority ⇒ 2
case PoisonPill ⇒ 3
case otherwise ⇒ 1
})
case class TimedEnvelope(envelope: Envelope) {
private val _timestamp = System.nanoTime()
def timestamp = _timestamp
}
class UnboundedTimedPriorityMailbox( final val cmp: Comparator[TimedEnvelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[TimedEnvelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[TimedEnvelope](initialCapacity, cmp) with TimedQueueBasedMessageQueue with TimedUnboundedMessageQueueSemantics {
override def queue: java.util.Queue[TimedEnvelope] = this
}
}
trait TimedQueueBasedMessageQueue extends MessageQueue {
def queue: java.util.Queue[TimedEnvelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
if (hasMessages) {
var envelope = dequeue()
while (envelope ne null) {
deadLetters.enqueue(owner, envelope)
envelope = dequeue()
}
}
}
}
trait TimedUnboundedMessageQueueSemantics extends TimedQueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope) { queue add TimedEnvelope(handle) }
def dequeue(): Envelope = Option(queue.poll()).map(_.envelope).getOrElse(null)
}
object TimedPriorityGenerator {
def apply(priorityFunction: Any ⇒ Int): TimedPriorityGenerator = new TimedPriorityGenerator {
def gen(message: Any): Int = priorityFunction(message)
}
}
abstract class TimedPriorityGenerator extends java.util.Comparator[TimedEnvelope] {
def gen(message: Any): Int
final def compare(thisMessage: TimedEnvelope, thatMessage: TimedEnvelope): Int = {
val result = gen(thisMessage.envelope.message) - gen(thatMessage.envelope.message)
// Int.MaxValue / Int.MinValue check omitted
if(result == 0) (thisMessage.timestamp - thatMessage.timestamp).toInt else result
}
}
The code above works ok.
Only a detail. Avoid using System.getTimeNano(). It has problems in multi-core machines as it is defined by a per-cpu logic
Here another post
Then, We have an strange behavior in the messages order Dependending on which cpu enque it.
I change it with classic System.currentTimeMillis(). It is less precise but, on our case if two messages with same priority and with same millisecond generation time, Don't care the order they are treated.
Thanks for the code!
链接地址: http://www.djcxy.com/p/21510.html上一篇: 如何在使用LinkedBlockingQueue时优先考虑消费者?
下一篇: akka演员发邮件给MailBox的头像