在使用scala actors时应该如何处理阻塞操作?
大约两天前我开始学习scala actors框架。 为了使思路具体化,我决定实现一个基于TCP的echo服务器,它可以处理多个同时连接。
以下是echo服务器的代码(不包括错误处理):
class EchoServer extends Actor {
private var connections = 0
def act() {
val serverSocket = new ServerSocket(6789)
val echoServer = self
actor { while (true) echoServer ! ("Connected", serverSocket.accept) }
while (true) {
receive {
case ("Connected", connectionSocket: Socket) =>
connections += 1
(new ConnectionHandler(this, connectionSocket)).start
case "Disconnected" =>
connections -= 1
}
}
}
}
基本上,服务器是一个处理“连接”和“断开”消息的Actor。 它委托侦听匿名actor的连接,在serverSocket上调用accept()方法(阻塞操作)。 当连接到达时,它通过“已连接”消息通知服务器,并将其传递给套接字以用于与新连接的客户端进行通信。 ConnectionHandler类的一个实例处理与客户端的实际通信。
这里是连接处理程序的代码(包括一些错误处理):
class ConnectionHandler(server: EchoServer, connectionSocket: Socket)
extends Actor {
def act() {
for (input <- getInputStream; output <- getOutputStream) {
val handler = self
actor {
var continue = true
while (continue) {
try {
val req = input.readLine
if (req != null) handler ! ("Request", req)
else continue = false
} catch {
case e: IOException => continue = false
}
}
handler ! "Disconnected"
}
var connected = true
while (connected) {
receive {
case ("Request", req: String) =>
try {
output.writeBytes(req + "n")
} catch {
case e: IOException => connected = false
}
case "Disconnected" =>
connected = false
}
}
}
close()
server ! "Disconnected"
}
// code for getInputStream(), getOutputStream() and close() methods
}
连接处理程序使用匿名参与者,通过在套接字的输入流上调用readLine()方法(阻塞操作)来等待请求发送到套接字。 当收到一个请求时,一个“请求”消息被发送给处理程序,然后简单地将请求回送给客户端。 如果处理程序或匿名角色遇到底层套接字问题,那么套接字将关闭,并向回显服务器发送“断开连接”消息,指示客户端已与服务器断开连接。
所以,我可以启动回声服务器并让它等待连接。 然后我可以打开一个新的终端并通过telnet连接到服务器。 我可以发送请求,它的响应正确。 现在,如果我打开另一个终端并连接到服务器,服务器将注册连接,但无法启动此新连接的连接处理程序。 当我通过任何现有的连接发送消息时,我无法立即回复。 这是有趣的部分。 当我终止除现有客户端连接之外的所有客户端连接并断开客户端X时,将返回通过客户端X发送的对请求的所有响应。 我做了一些测试,并得出结论认为,即使我在创建连接处理程序时调用start()方法,后续客户端连接上仍未调用act()方法。
我想我在连接处理程序中正确处理阻塞操作。 由于以前的连接是由连接处理程序处理的,该连接处理程序有一个匿名角色阻塞等待请求,所以我认为这个被阻止的actor正在阻止其他actor(连接处理程序)启动。
在使用scala actors时应该如何处理阻塞操作?
任何帮助将不胜感激。
从scaladoc for scala.actors.Actor:
注意:当调用除了Actor特征或其伴随对象提供的线程阻塞方法(比如receive
)时必须小心。 阻止演员内的潜在线程可能导致其他演员饥饿。 这也适用于在调用receive
/ react
之间长时间占用线程的角色。
如果参与者使用阻塞操作(例如阻塞I / O的方法),则有以下几种选择:
actors.corePoolSize
JVM属性)。 Actor
特性的scheduler
方法可以被重写,以返回一个ResizableThreadPoolScheduler
,它调整其线程池的大小以避免由调用任意阻塞方法的actor引起的饥饿。 actors.enableForkJoin
JVM属性可以设置为false,在这种情况下,默认情况下会使用ResizableThreadPoolScheduler
来执行参与者。 上一篇: How should I handle blocking operations when using scala actors?
下一篇: Mapping between Wingdings/Symbol characters and their Unicode equivalents