从线程模型转换到角色
试图处理如何用角色而不是线程来思考。 我对下面的用例有点困惑:
考虑一个系统,该系统具有创建工作的生产者进程(例如通过读取文件中的数据)以及使用该工作的大量工作进程(例如,通过解析数据并将其写入数据库)。 工作的产生和消耗速度可能会有所不同,并且系统应该保持稳定。 例如,如果工人无法跟上,制片人应该检测到这一点,并最终放慢速度或等待。
这很容易用线程来实现:
val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
new Thread() {
override def run() = {
while (true) {
try {
// take next unit of work, waiting if necessary
val work = queue.take()
process(work)
}
catch {
case e:InterruptedException => return
}
}
}
}
}
// start the workers
workers.foreach(_.start())
while (producer.hasNext) {
val work = producer.next()
// add new unit of work, waiting if necessary
queue.put(work)
}
while (!queue.isEmpty) {
// wait until queue is drained
queue.wait()
}
// stop the workers
workers.foreach(_.interrupt())
这个模型没有什么错,我已经成功地使用过它。 这个例子可能过于冗长,因为使用Executor或CompletionService可以很好地适应这个任务。 但我喜欢演员的抽象,并且认为在很多情况下推理是比较容易的。 有没有办法使用actors重写这个例子,特别是确保没有缓冲区溢出(例如完整的邮箱,丢弃的消息等)?
由于演员处理“离线”消息(即消息的消费与他们正在接收的消息不相关),因此很难看到你如何能够准确地模拟“制片人等待消费者赶上”。
我能想到的唯一的事情就是消费者要求制片人(使用reply
)的作品:
case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
def act = {
prod ! MoreWorkPlease
loop {
react {
case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
}
}
}
}
class Producer extends Actor {
def act = loop {
react {
case MoreWorkPlease => reply(Work(getNextItem))
}
}
}
当然,这并不完美,因为制片人不会“向前阅读”,只有在消费者做好准备后才能开始工作。 用法如下所示:
val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()
链接地址: http://www.djcxy.com/p/3663.html