Tornado TCP Server / Client process communication

I want to setup communication between a number of Tornado processes each acting as web-servers Ie using tornado.web.RequestHandler . The idea is that i want a fully meshed network between the processes. I have 4 processes and I want to establish an ongoing permanent communication between them using the tornado.tcpserver and tornado.tcpclient :

T1---T2
|   /| 
|  / |
| /  |
T3---T4

I'm new to TCP programming however in the example I've seen in the tornado documentation: http://www.tornadoweb.org/en/stable/iostream.html Under Implementations for class tornado.iostream.IOStream once a socket is established all the communication is done and then then socket is closed. The example drives the code through blocks with callbacks each performing their duty of the communication.

However is it possible to open a TCP connection and have the BaseIOStream.read_until_close() idle and called only when the client writes to the server?

In other words the client and server stay connected and when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?

Or is my thinking misguided and the way to do this is every time I need the processes to communicate I establish a new TCP connection, do the work and then kill the connection? It just seems like establishing this new connection every time would contain a lot of overhead rather than leaving the connection open...


Here's a basic implementation. (I can't promise it's production-quality!) Save it to a file and execute something like this, each in a different terminal window:

> python myscript.py 10001 10002 10003
> python myscript.py 10002 10003 10001
> python myscript.py 10003 10001 10002

The first argument is the listening port, the remaining arguments are the ports of the other servers.

import argparse
import logging
import os
import random
import socket
import struct

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, StreamClosedError
from tornado.tcpclient import TCPClient
from tornado.tcpserver import TCPServer
from tornado.options import options as tornado_options


parser = argparse.ArgumentParser()
parser.add_argument("port", type=int, help="port to listen on")
parser.add_argument("peers", type=int, nargs="+", help="peers' ports")
opts = parser.parse_args()

# This is just to configure Tornado logging.
tornado_options.parse_command_line()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)

# Cache this struct definition; important optimization.
int_struct = struct.Struct("<i")
_UNPACK_INT = int_struct.unpack
_PACK_INT = int_struct.pack

tcp_client = TCPClient()


@gen.coroutine
def client(port):
    while True:
        try:
            stream = yield tcp_client.connect('localhost', port)
            logging.info("Connected to %d", port)

            # Set TCP_NODELAY / disable Nagle's Algorithm.
            stream.set_nodelay(True)

            while True:
                msg = ("Hello from port %d" % opts.port).encode()
                length = _PACK_INT(len(msg))
                yield stream.write(length + msg)
                yield gen.sleep(random.random() * 10)

        except StreamClosedError as exc:
            logger.error("Error connecting to %d: %s", port, exc)
            yield gen.sleep(5)


loop = IOLoop.current()

for peer in opts.peers:
    loop.spawn_callback(client, peer)


class MyServer(TCPServer):
    @gen.coroutine
    def handle_stream(self, stream, address):
        logging.info("Connection from peer")
        try:
            while True:
                # Read 4 bytes.
                header = yield stream.read_bytes(4)

                # Convert from network order to int.
                length = _UNPACK_INT(header)[0]

                msg = yield stream.read_bytes(length)
                logger.info('"%s"' % msg.decode())

                del msg  # Dereference msg in case it's big.

        except StreamClosedError:
            logger.error("%s disconnected", address)


server = MyServer()
server.listen(opts.port)

loop.start()

Notice that we don't call read_until_close, so we need some way to know when a message is completely received. I do this with a 32-bit integer at the beginning of each message which encodes the length of the rest of the message.

You asked, "when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?" This is what Tornado's IOLoop is for, and it's what we mean by "async": many Tornado coroutines or callbacks can wait for network events, and the IOLoop wakes them when the events they're awaiting occur. That's what's happening wherever you see "yield" in the code above.


However is it possible to open a TCP connection and have the BaseIOStream.read_until_close() idle and called only when the client writes to the server?

Not aware of Tornado. But, as far as TCP is concerned, once the connection is established (server and client maintain the state as 'ESTABLISHED') server and client can exchange data till anyone wishes to close the connection or in the event of network issues which causes messages sent not to reach other end.

In other words the client and server stay connected and when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?

Yes. This should be the case.

Or is my thinking misguided and the way to do this is every time I need the processes to communicate I establish a new TCP connection, do the work and then kill the connection?

No. Every data exchange need not have re-initiation of TCP connection

链接地址: http://www.djcxy.com/p/44778.html

上一篇: 处理频繁更改的数据表单的选项

下一篇: Tornado TCP服务器/客户端进程通信