Python multiprocessing and networking on Windows

I'm trying to implement a tcp 'echo server'. Simple stuff:

  • Client sends a message to the server.
  • Server receives the message
  • Server converts message to uppercase
  • Server sends modified message to client
  • Client prints the response.
  • It worked well, so I decided to parallelize the server; make it so that it could handle multiple clients at time. Since most Python interpreters have a GIL, multithreading won't cut it. I had to use multiproces... And boy, this is where things went downhill.

    I'm using Windows 10 x64 and the WinPython suit with Python 3.5.2 x64.

    My idea is to create a socket, intialize it (bind and listen), create sub processes and pass the socket to the children. But for the love of me... I can't make this work, my subprocesses die almost instantly. Initially I had some issues 'pickling' the socket... So I googled a bit and thought this was the issue. So I tried passing my socket thru a multiprocessing queue, through a pipe and my last attempt was 'forkpickling' and passing it as a bytes object during the processing creating. Nothing works.

    Can someone please shed some light here? Tell me whats wrong? Maybe the whole idea (sharing sockets) is bad... And if so, PLEASE tell me how can I achieve my initial objective: enabling my server to ACTUALLY handle multiple clients at once (on Windows) (don't tell me about threading, we all know python's threading won't cut it ¬¬)

    It also worth noting that no files are create by the debug function. No process lived long enough to run it, I believe.

    The typical output of my server code is (only difference between runs is the process numbers):

    Server is running...
    Degree of parallelism: 4
    Socket created.
    Socket bount to: ('', 0)
    Process 3604 is alive: True
    Process 5188 is alive: True
    Process 6800 is alive: True
    Process 2844 is alive: True
    
    Press ctrl+c to kill all processes.
    
    Process 3604 is alive: False
    Process 3604 exit code: 1
    Process 5188 is alive: False
    Process 5188 exit code: 1
    Process 6800 is alive: False
    Process 6800 exit code: 1
    Process 2844 is alive: False
    Process 2844 exit code: 1
    The children died...
    Why god?
    WHYYyyyyy!!?!?!?
    

    The server code:

    # Imports
    import socket 
    import packet
    import sys
    import os
    from time import sleep
    import multiprocessing as mp
    import pickle
    import io
    
    # Constants
    DEGREE_OF_PARALLELISM = 4
    DEFAULT_HOST = ""
    DEFAULT_PORT = 0
    
    def _parse_cmd_line_args():
        arguments = sys.argv
        if len(arguments) == 1:
            return DEFAULT_HOST, DEFAULT_PORT
        else:
            raise NotImplemented()
    
    def debug(data):
        pid = os.getpid()
        with open('C:UsersTrauerDesktopdebug'+str(pid)+'.txt', mode='a',
                  encoding='utf8') as file:
            file.write(str(data) + 'n')
    
    def handle_connection(client):
        client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES)
        debug('received data from client: ' + str(len(client_data)))
        response = client_data.upper()
        client.send(response)    
        debug('sent data from client: ' + str(response))
    
    def listen(picklez):    
        debug('started listen function')
    
        pid = os.getpid()
        server_socket = pickle.loads(picklez)
        debug('acquired socket')
    
        while True:
            debug('Sub process {0} is waiting for connection...'.format(str(pid)))
    
            client, address = server_socket.accept()
            debug('Sub process {0} accepted connection {1}'.format(str(pid),
                  str(client)))
    
            handle_connection(client)        
            client.close()
            debug('Sub process {0} finished handling connection {1}'.
                  format(str(pid),str(client)))
    
    if __name__ == "__main__":    
    #   Since most python interpreters have a GIL, multithreading won't cut
    #   it... Oughta bust out some process, yo!
        host_port = _parse_cmd_line_args()
        print('Server is running...')
        print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM))
    
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print('Socket created.')
    
        server_socket.bind(host_port)
        server_socket.listen(DEGREE_OF_PARALLELISM)
        print('Socket bount to: ' + str(host_port))        
    
        buffer = io.BytesIO()
        mp.reduction.ForkingPickler(buffer).dump(server_socket)
        picklez = buffer.getvalue()
    
        children = []
        for i in range(DEGREE_OF_PARALLELISM):        
            child_process = mp.Process(target=listen, args=(picklez,))
            child_process.daemon = True
            child_process.start()
            children.append(child_process)
    
            while not child_process.pid:
                sleep(.25)
    
            print('Process {0} is alive: {1}'.format(str(child_process.pid), 
                  str(child_process.is_alive())))     
        print()    
    
        kids_are_alive = True
        while kids_are_alive:
            print('Press ctrl+c to kill all processes.n')
            sleep(1) 
    
            exit_codes = []
            for child_process in children:
                print('Process {0} is alive: {1}'.format(str(child_process.pid), 
                  str(child_process.is_alive())))
                print('Process {0} exit code: {1}'.format(str(child_process.pid), 
                  str(child_process.exitcode)))
                exit_codes.append(child_process.exitcode)
    
            if all(exit_codes):
                # Why do they die so young? :(
                print('The children died...')
                print('Why god?')
                print('WHYYyyyyy!!?!?!?')
                kids_are_alive = False
    

    edit: fixed the signature of "listen". My processes still die instantly.

    edit2: User cmidi pointed out that this code does work on Linux; so my question is: How can I 'made this work' on Windows?


    You can directly pass a socket to a child process. multiprocessing registers a reduction for this, for which the Windows implementation uses the following DupSocket class from multiprocessing.resource_sharer :

    class DupSocket(object):
        '''Picklable wrapper for a socket.'''
        def __init__(self, sock):
            new_sock = sock.dup()
            def send(conn, pid):
                share = new_sock.share(pid)
                conn.send_bytes(share)
            self._id = _resource_sharer.register(send, new_sock.close)
    
        def detach(self):
            '''Get the socket.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                share = conn.recv_bytes()
                return socket.fromshare(share)
    

    This calls the Windows socket share method, which returns the protocol info buffer from calling WSADuplicateSocket . It registers with the resource sharer to send this buffer over a connection to the child process. The child in turn calls detach , which receives the protocol info buffer and reconstructs the socket via socket.fromshare .

    It's not directly related to your problem, but I recommend that you redesign the server to instead call accept in the main process, which is the way this is normally done (eg in Python's socketserver.ForkingTCPServer module). Pass the resulting (conn, address) tuple to the first available worker over a multiprocessing.Queue , which is shared by all of the workers in the process pool. Or consider using a multiprocessing.Pool with apply_async .


    def listen() the target/start for your child processes does not take any argument but you are providing serialized socket as an argument args=(picklez,) to the child process this would cause an exception in the child process and exit immediately.

    TypeError: listen() takes no arguments (1 given)
    

    def listen(picklez) should solve the problem this will provide one argument to the target of your child processes.

    链接地址: http://www.djcxy.com/p/37618.html

    上一篇: 传回多个服务uris

    下一篇: Windows上的Python多处理和联网