Windows上的Python多处理和联网
我试图实现一个tcp'回显服务器'。 简单的东西:
它运行良好,所以我决定并行化服务器; 使其能够在时间处理多个客户端。 由于大多数Python解释器都有GIL,所以多线程不会削减它。 我不得不使用多处理器......而男孩,这是事情进展缓慢的地方。
我使用Windows 10 x64和WinPython与Python 3.5.2 x64配合使用。
我的想法是创建一个套接字,初始化它(绑定和监听),创建子进程并将套接字传递给子节点。 但对于我的爱......我无法做到这一点,我的子流程几乎立即死亡。 最初,我有一些问题“酸洗”插座...所以我GOOGLE了一下,并认为这是问题。 所以我尝试通过一个多处理队列通过我的套接字,通过管道和我的最后一次尝试是'forkpickling'并在处理创建期间将它作为字节对象传递。 什么都没有
有人可以在这里谈一谈吗? 告诉我出了什么事? 也许整个想法(共享套接字)是不好的......如果是这样,请告诉我如何实现我的最初目标:使我的服务器能够一次处理多个客户端(在Windows上) (不要告诉我有关线程,我们都知道python的线程不会削减它¬¬)
还值得注意的是,没有文件是由调试功能创建的。 我相信,没有任何程序可以运行足够长的时间来运行。
我的服务器代码的典型输出是(只有运行之间的区别是进程号):
Server is running...
Degree of parallelism: 4
Socket created.
Socket bount to: ('', 0)
Process 3604 is alive: True
Process 5188 is alive: True
Process 6800 is alive: True
Process 2844 is alive: True
Press ctrl+c to kill all processes.
Process 3604 is alive: False
Process 3604 exit code: 1
Process 5188 is alive: False
Process 5188 exit code: 1
Process 6800 is alive: False
Process 6800 exit code: 1
Process 2844 is alive: False
Process 2844 exit code: 1
The children died...
Why god?
WHYYyyyyy!!?!?!?
服务器代码:
# Imports
import socket
import packet
import sys
import os
from time import sleep
import multiprocessing as mp
import pickle
import io
# Constants
DEGREE_OF_PARALLELISM = 4
DEFAULT_HOST = ""
DEFAULT_PORT = 0
def _parse_cmd_line_args():
arguments = sys.argv
if len(arguments) == 1:
return DEFAULT_HOST, DEFAULT_PORT
else:
raise NotImplemented()
def debug(data):
pid = os.getpid()
with open('C:UsersTrauerDesktopdebug'+str(pid)+'.txt', mode='a',
encoding='utf8') as file:
file.write(str(data) + 'n')
def handle_connection(client):
client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES)
debug('received data from client: ' + str(len(client_data)))
response = client_data.upper()
client.send(response)
debug('sent data from client: ' + str(response))
def listen(picklez):
debug('started listen function')
pid = os.getpid()
server_socket = pickle.loads(picklez)
debug('acquired socket')
while True:
debug('Sub process {0} is waiting for connection...'.format(str(pid)))
client, address = server_socket.accept()
debug('Sub process {0} accepted connection {1}'.format(str(pid),
str(client)))
handle_connection(client)
client.close()
debug('Sub process {0} finished handling connection {1}'.
format(str(pid),str(client)))
if __name__ == "__main__":
# Since most python interpreters have a GIL, multithreading won't cut
# it... Oughta bust out some process, yo!
host_port = _parse_cmd_line_args()
print('Server is running...')
print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM))
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('Socket created.')
server_socket.bind(host_port)
server_socket.listen(DEGREE_OF_PARALLELISM)
print('Socket bount to: ' + str(host_port))
buffer = io.BytesIO()
mp.reduction.ForkingPickler(buffer).dump(server_socket)
picklez = buffer.getvalue()
children = []
for i in range(DEGREE_OF_PARALLELISM):
child_process = mp.Process(target=listen, args=(picklez,))
child_process.daemon = True
child_process.start()
children.append(child_process)
while not child_process.pid:
sleep(.25)
print('Process {0} is alive: {1}'.format(str(child_process.pid),
str(child_process.is_alive())))
print()
kids_are_alive = True
while kids_are_alive:
print('Press ctrl+c to kill all processes.n')
sleep(1)
exit_codes = []
for child_process in children:
print('Process {0} is alive: {1}'.format(str(child_process.pid),
str(child_process.is_alive())))
print('Process {0} exit code: {1}'.format(str(child_process.pid),
str(child_process.exitcode)))
exit_codes.append(child_process.exitcode)
if all(exit_codes):
# Why do they die so young? :(
print('The children died...')
print('Why god?')
print('WHYYyyyyy!!?!?!?')
kids_are_alive = False
编辑:修正了“听”的签名。 我的程序仍然会立即死亡。
edit2:用户cmidi指出,这段代码在Linux上可以工作; 所以我的问题是:我如何'在Windows上完成这项工作'?
您可以直接将套接字传递给子进程。 多处理注册一个减少,为此,Windows实现使用multiprocessing.resource_sharer
的以下DupSocket
类:
class DupSocket(object):
'''Picklable wrapper for a socket.'''
def __init__(self, sock):
new_sock = sock.dup()
def send(conn, pid):
share = new_sock.share(pid)
conn.send_bytes(share)
self._id = _resource_sharer.register(send, new_sock.close)
def detach(self):
'''Get the socket. This should only be called once.'''
with _resource_sharer.get_connection(self._id) as conn:
share = conn.recv_bytes()
return socket.fromshare(share)
这将调用Windows套接字share
方法,该方法从调用WSADuplicateSocket
返回协议信息缓冲区。 它向资源共享者注册,通过连接将此缓冲区发送给子进程。 孩子反过来调用detach
,它接收协议信息缓冲区并通过socket.fromshare
重建套接字。
它与你的问题没有直接关系,但我建议你重新设计服务器,而不是在主进程中调用accept
,这就是通常的做法(例如在Python的socketserver.ForkingTCPServer
模块中)。 通过一个multiprocessing.Queue
将生成的(conn, address)
元组传递给第一个可用的worker,它由进程池中的所有worker共享。 或者考虑使用multiprocessing.Pool
与apply_async
。
def listen()
您的子进程的目标/开始不会接受任何参数,但是您将序列化套接字作为参数args=(picklez,)
给子进程,这会在子进程中导致异常并立即退出。
TypeError: listen() takes no arguments (1 given)
def listen(picklez)
应该解决问题,这将为您的子进程的目标提供一个参数。
上一篇: Python multiprocessing and networking on Windows
下一篇: Is there a way to force "if" to only accept boolean in TypeScript?