为什么我可以将实例方法传递给multiprocessing.Process,而不是multiprocessing.Pool?

我正在尝试编写一个应用程序,该应用程序与multiprocessing.Pool同时应用一个函数。 我想这个函数是一个实例方法(所以我可以在不同的子类中定义它)。 这似乎不可能; 正如我在其他地方所了解到的,显然绑定的方法不能被腌制。 那么为什么要开始一个multiprocessing.Process 。一个绑定方法作为一个目标工作的过程? 以下代码:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

产生这个输出:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:Python27Libthreading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:Python27Libthreading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:Python27Libmultiprocessingpool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

为什么进程可以处理绑定方法,但不处理池?


pickle模块通常不能腌制实例方法:

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

但是, multiprocessing模块具有自定义Pickler ,它添加了一些代码来启用此功能:

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

您可以使用copy_reg模块复制它以查看它是否适用于您自己:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__ngetattrnp0n(ccopy_regn_reconstructornp1n(c__main__nAnp2nc__builtin__nobjectnp3nNtp4nRp5nS'z'np6ntp7nRp8n."

当你使用Process.start在Windows上产生一个新的进程时,它使用这个自定义的ForkingPickler来传递你传递给子进程的所有参数:

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

请注意“将信息发送给孩子”部分。 它使用dump函数,它使用ForkingPickler来腌制数据,这意味着您的实例方法可以被腌制。

现在,当您在multiprocessing.Pool上使用方法将方法发送给子进程时,它使用multiprocessing.Pipe来挑选数据。 在Python 2.7中, multiprocessing.Pipe在C中实现,并直接调用pickle_dumps ,所以它没有利用ForkingPickler 。 这意味着酸洗实例方法不起作用。

但是,如果使用copy_reg注册instancemethod类型,而不是自定义Pickler ,则所有酸洗尝试都将受到影响。 所以你可以使用它来启用酸洗实例方法,即使通过Pool

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

输出:

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

另外请注意,在Python 3.x中, pickle可以在本地使用实例方法类型,所以这些东西都不再重要。 :)


以下是我有时使用的一种替代方法,它适用于Python2.x:

您可以为实例方法创建一个排序的顶级“别名”,它接受一个对象,该对象的实例方法要在池中运行,并让它为您调用实例方法:

import functools
import multiprocessing

def _instance_method_alias(obj, arg):
    """
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool
    """
    obj.instance_method(arg)
    return

class MyClass(object):
    """
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool
    """

    def __init__(self):
        self.my_string = "From MyClass: {}"

    def instance_method(self, arg):
        """
        Some arbitrary instance method
        """

        print(self.my_string.format(arg))
        return

# create an object of MyClass
obj = MyClass()

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)

# create our list of things we will use the pool to map
l = [1,2,3]

# create the pool of workers
pool = multiprocessing.Pool()

# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)

# cleanup
pool.close()
pool.join()

这段代码产生这个输出:

来自MyClass:1
来自MyClass:2
来自MyClass:3

一个限制是你不能用它来修改对象的方法。 每个进程都会获取其调用方法的对象的副本,因此更改不会传播回主进程。 如果你不需要从你调用的方法中修改对象,这可以是一个简单的解决方案。


这是Python 2中更简单的工作方式,只是包装原始实例方法。 在MacOSX和Linux上运行良好,不适用于Windows,测试过Python 2.7

from multiprocessing import Pool

class Person(object):
    def __init__(self):
        self.name = 'Weizhong Tu'

    def calc(self, x):
        print self.name
        return x ** 5


def func(x, p=Person()):
    return p.calc(x)


pool = Pool()
print pool.map(func, range(10))
链接地址: http://www.djcxy.com/p/55295.html

上一篇: Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

下一篇: python 3.x