Tornado TCP服务器/客户端进程通信
我想在使用tornado.web.RequestHandler
各个充当Web服务器的Tornado进程之间建立通信。 这个想法是,我想要一个进程之间的全网状网络。 我有4个进程,我想使用tornado.tcpserver
和tornado.tcpclient
在它们之间建立持续的永久通信:
T1---T2
| /|
| / |
| / |
T3---T4
我是TCP编程的新手,但在我见过的龙卷风文档中的示例中:http://www.tornadoweb.org/en/stable/iostream.html在类tornado.iostream.IOStream
实现下 ,一旦套接字建立了所有的通信完成,然后套接字关闭。 该示例通过回调块来驱动代码,每个回调函数都执行通信任务。
但是,是否有可能打开一个TCP连接并让BaseIOStream.read_until_close()
空闲并仅在客户端写入服务器时调用?
换句话说,客户端和服务器保持连接,当客户端写入服务器时,它会以某种方式中断Tornado IOLoop来调用read()?
或者我的想法是错误的,如何做到这一点是每次我需要进程进行通信时,我建立一个新的TCP连接,完成工作,然后终止连接? 似乎每次建立这个新连接都会包含大量开销,而不是让连接断开......
这是一个基本的实现。 (我不能保证它的生产质量!)将它保存到一个文件并执行这样的事情,每个都在不同的终端窗口中:
> python myscript.py 10001 10002 10003
> python myscript.py 10002 10003 10001
> python myscript.py 10003 10001 10002
第一个参数是侦听端口,其余参数是其他服务器的端口。
import argparse
import logging
import os
import random
import socket
import struct
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, StreamClosedError
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer
from tornado.options import options as tornado_options
parser = argparse.ArgumentParser()
parser.add_argument("port", type=int, help="port to listen on")
parser.add_argument("peers", type=int, nargs="+", help="peers' ports")
opts = parser.parse_args()
# This is just to configure Tornado logging.
tornado_options.parse_command_line()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)
# Cache this struct definition; important optimization.
int_struct = struct.Struct("<i")
_UNPACK_INT = int_struct.unpack
_PACK_INT = int_struct.pack
tcp_client = TCPClient()
@gen.coroutine
def client(port):
while True:
try:
stream = yield tcp_client.connect('localhost', port)
logging.info("Connected to %d", port)
# Set TCP_NODELAY / disable Nagle's Algorithm.
stream.set_nodelay(True)
while True:
msg = ("Hello from port %d" % opts.port).encode()
length = _PACK_INT(len(msg))
yield stream.write(length + msg)
yield gen.sleep(random.random() * 10)
except StreamClosedError as exc:
logger.error("Error connecting to %d: %s", port, exc)
yield gen.sleep(5)
loop = IOLoop.current()
for peer in opts.peers:
loop.spawn_callback(client, peer)
class MyServer(TCPServer):
@gen.coroutine
def handle_stream(self, stream, address):
logging.info("Connection from peer")
try:
while True:
# Read 4 bytes.
header = yield stream.read_bytes(4)
# Convert from network order to int.
length = _UNPACK_INT(header)[0]
msg = yield stream.read_bytes(length)
logger.info('"%s"' % msg.decode())
del msg # Dereference msg in case it's big.
except StreamClosedError:
logger.error("%s disconnected", address)
server = MyServer()
server.listen(opts.port)
loop.start()
请注意,我们不会调用read_until_close,所以我们需要一些方法来知道消息何时完全接收。 我在每条消息的开始处用一个32位整数来完成此操作,该消息编码消息其余部分的长度。
你问,“当客户端写入服务器时,它会以某种方式中断Tornado IOLoop来调用read()?” 这就是Tornado的IOLoop的用途,这就是我们所说的“异步”:许多Tornado协程或回调可以等待网络事件,IOLoop在他们正在等待的事件发生时将其唤醒。 无论您在上面的代码中看到“收益率”,都会发生这种情况。
但是,是否有可能打开一个TCP连接并让BaseIOStream.read_until_close()空闲并仅在客户端写入服务器时调用?
不知道龙卷风。 但是,就TCP而言,一旦建立连接(服务器和客户端将状态保持为“ESTABLISHED”),服务器和客户端就可以交换数据,直到任何人希望关闭连接,或者在发生导致发送消息的网络问题不达到另一端。
换句话说,客户端和服务器保持连接,当客户端写入服务器时,它会以某种方式中断Tornado IOLoop来调用read()?
是。 这应该是这种情况。
或者我的想法是错误的,如何做到这一点是每次我需要进程进行通信时,我建立一个新的TCP连接,完成工作,然后终止连接?
每个数据交换都不需要重新启动TCP连接
链接地址: http://www.djcxy.com/p/44777.html