Windows上的Python多处理和联网

我试图实现一个tcp'回显服务器'。 简单的东西:

  • 客户端发送消息到服务器。
  • 服务器收到消息
  • 服务器将消息转换为大写
  • 服务器将修改的消息发送给客户
  • 客户打印回应。
  • 它运行良好,所以我决定并行化服务器; 使其能够在时间处理多个客户端。 由于大多数Python解释器都有GIL,所以多线程不会削减它。 我不得不使用多处理器......而男孩,这是事情进展缓慢的地方。

    我使用Windows 10 x64和WinPython与Python 3.5.2 x64配合使用。

    我的想法是创建一个套接字,初始化它(绑定和监听),创建子进程并将套接字传递给子节点。 但对于我的爱......我无法做到这一点,我的子流程几乎立即死亡。 最初,我有一些问题“酸洗”插座...所以我GOOGLE了一下,并认为这是问题。 所以我尝试通过一个多处理队列通过我的套接字,通过管道和我的最后一次尝试是'forkpickling'并在处理创建期间将它作为字节对象传递。 什么都没有

    有人可以在这里谈一谈吗? 告诉我出了什么事? 也许整个想法(共享套接字)是不好的......如果是这样,请告诉我如何实现我的最初目标:使我的服务器能够一次处理多个客户端(在Windows上) (不要告诉我有关线程,我们都知道python的线程不会削减它¬¬)

    还值得注意的是,没有文件是由调试功能创建的。 我相信,没有任何程序可以运行足够长的时间来运行。

    我的服务器代码的典型输出是(只有运行之间的区别是进程号):

    Server is running...
    Degree of parallelism: 4
    Socket created.
    Socket bount to: ('', 0)
    Process 3604 is alive: True
    Process 5188 is alive: True
    Process 6800 is alive: True
    Process 2844 is alive: True
    
    Press ctrl+c to kill all processes.
    
    Process 3604 is alive: False
    Process 3604 exit code: 1
    Process 5188 is alive: False
    Process 5188 exit code: 1
    Process 6800 is alive: False
    Process 6800 exit code: 1
    Process 2844 is alive: False
    Process 2844 exit code: 1
    The children died...
    Why god?
    WHYYyyyyy!!?!?!?
    

    服务器代码:

    # Imports
    import socket 
    import packet
    import sys
    import os
    from time import sleep
    import multiprocessing as mp
    import pickle
    import io
    
    # Constants
    DEGREE_OF_PARALLELISM = 4
    DEFAULT_HOST = ""
    DEFAULT_PORT = 0
    
    def _parse_cmd_line_args():
        arguments = sys.argv
        if len(arguments) == 1:
            return DEFAULT_HOST, DEFAULT_PORT
        else:
            raise NotImplemented()
    
    def debug(data):
        pid = os.getpid()
        with open('C:UsersTrauerDesktopdebug'+str(pid)+'.txt', mode='a',
                  encoding='utf8') as file:
            file.write(str(data) + 'n')
    
    def handle_connection(client):
        client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES)
        debug('received data from client: ' + str(len(client_data)))
        response = client_data.upper()
        client.send(response)    
        debug('sent data from client: ' + str(response))
    
    def listen(picklez):    
        debug('started listen function')
    
        pid = os.getpid()
        server_socket = pickle.loads(picklez)
        debug('acquired socket')
    
        while True:
            debug('Sub process {0} is waiting for connection...'.format(str(pid)))
    
            client, address = server_socket.accept()
            debug('Sub process {0} accepted connection {1}'.format(str(pid),
                  str(client)))
    
            handle_connection(client)        
            client.close()
            debug('Sub process {0} finished handling connection {1}'.
                  format(str(pid),str(client)))
    
    if __name__ == "__main__":    
    #   Since most python interpreters have a GIL, multithreading won't cut
    #   it... Oughta bust out some process, yo!
        host_port = _parse_cmd_line_args()
        print('Server is running...')
        print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM))
    
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print('Socket created.')
    
        server_socket.bind(host_port)
        server_socket.listen(DEGREE_OF_PARALLELISM)
        print('Socket bount to: ' + str(host_port))        
    
        buffer = io.BytesIO()
        mp.reduction.ForkingPickler(buffer).dump(server_socket)
        picklez = buffer.getvalue()
    
        children = []
        for i in range(DEGREE_OF_PARALLELISM):        
            child_process = mp.Process(target=listen, args=(picklez,))
            child_process.daemon = True
            child_process.start()
            children.append(child_process)
    
            while not child_process.pid:
                sleep(.25)
    
            print('Process {0} is alive: {1}'.format(str(child_process.pid), 
                  str(child_process.is_alive())))     
        print()    
    
        kids_are_alive = True
        while kids_are_alive:
            print('Press ctrl+c to kill all processes.n')
            sleep(1) 
    
            exit_codes = []
            for child_process in children:
                print('Process {0} is alive: {1}'.format(str(child_process.pid), 
                  str(child_process.is_alive())))
                print('Process {0} exit code: {1}'.format(str(child_process.pid), 
                  str(child_process.exitcode)))
                exit_codes.append(child_process.exitcode)
    
            if all(exit_codes):
                # Why do they die so young? :(
                print('The children died...')
                print('Why god?')
                print('WHYYyyyyy!!?!?!?')
                kids_are_alive = False
    

    编辑:修正了“听”的签名。 我的程序仍然会立即死亡。

    edit2:用户cmidi指出,这段代码在Linux上可以工作; 所以我的问题是:我如何'在Windows上完成这项工作'?


    您可以直接将套接字传递给子进程。 多处理注册一个减少,为此,Windows实现使用multiprocessing.resource_sharer的以下DupSocket类:

    class DupSocket(object):
        '''Picklable wrapper for a socket.'''
        def __init__(self, sock):
            new_sock = sock.dup()
            def send(conn, pid):
                share = new_sock.share(pid)
                conn.send_bytes(share)
            self._id = _resource_sharer.register(send, new_sock.close)
    
        def detach(self):
            '''Get the socket.  This should only be called once.'''
            with _resource_sharer.get_connection(self._id) as conn:
                share = conn.recv_bytes()
                return socket.fromshare(share)
    

    这将调用Windows套接字share方法,该方法从调用WSADuplicateSocket返回协议信息缓冲区。 它向资源共享者注册,通过连接将此缓冲区发送给子进程。 孩子反过来调用detach ,它接收协议信息缓冲区并通过socket.fromshare重建套接字。

    它与你的问题没有直接关系,但我建议你重新设计服务器,而不是在主进程中调用accept ,这就是通常的做法(例如在Python的socketserver.ForkingTCPServer模块中)。 通过一个multiprocessing.Queue将生成的(conn, address)元组传递给第一个可用的worker,它由进程池中的所有worker共享。 或者考虑使用multiprocessing.Poolapply_async


    def listen()您的子进程的目标/开始不会接受任何参数,但是您将序列化套接字作为参数args=(picklez,)给子进程,这会在子进程中导致异常并立即退出。

    TypeError: listen() takes no arguments (1 given)
    

    def listen(picklez)应该解决问题,这将为您的子进程的目标提供一个参数。

    链接地址: http://www.djcxy.com/p/37617.html

    上一篇: Python multiprocessing and networking on Windows

    下一篇: Is there a way to force "if" to only accept boolean in TypeScript?