分别在并行进程中更改不同的python对象

简而言之

我想同时更改复杂的python对象,每个对象只能由一个进程处理。 我怎样才能做到这一点(效率最高)? 实施某种酸洗支持会有帮助吗? 这会有效吗?

充分的问题

我有一个Python数据结构ArrayDict ,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行。 在我的情况下,所有的键都是整数。

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true

现在我有多个这样的ArrayDict并希望将它们填充到myMethod(arrayDict, params) 。 由于myMethod价格昂贵,我想并行运行它。 请注意, myMethod可能会将许多行添加到arrayDict 。 每个进程都会改变它自己的ArrayDict 。 我不需要并发访问ArrayDict

myMethod ,我更改了arrayDict中的arrayDict (即更改了内部numpy数组),我向arrayDict添加了条目( arrayDict字典中添加了另一个索引并在内部数组中写入一个新值)。 最后,我希望能够在arrayDict的内部numpy数组变得太小时交换数组。 这不会经常发生,如果没有更好的解决方案,我可以在我的程序的非平行部分执行此操作。 即使没有阵列交换,我自己的尝试也不会成功。

我花了几天时间研究共享内存和python的多处理模块。 由于我最终会在linux上工作,所以这个任务似乎很简单:系统调用fork()允许有效地处理参数的副本。 然后,我的想法是在其自己的进程中更改每个ArrayDict ,返回对象的更改版本,并覆盖原始对象。 为了节省内存并保存复制工作,我还使用了sharedmem数组来存储ArrayDict的数据。 我知道字典必须被复制。

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False

while not done:
    consideredData = ...  # numpy boolean array of length
                          # n with True at the index of
                          # considered data
    args = ...            # numpy array containing arguments
                          # for myMethod

    with sharedmem.MapReduce() as pool:
        results = pool.map(myMethod, 
                           list(zip(myData[considered], 
                                    args[considered])),
                           star=True)
        myData[considered] = results

    done = ...             # depends on what happens in
                           # myMethod

我得到的是分段错误错误。 我能够通过创建ArrayDict对myMethodArrayDict并将它们保存到myData来绕过这个错误。 我不明白为什么这是必要的,并且经常复制我的(可能非常大的)数组(while循环花费很长时间)对我来说似乎并不高效。 但是,至少它在一定程度上起作用。 不过,由于共享内存的原因,我的程序在第3次迭代时出现了一些错误行为。 因此,我认为我的方式并不是最佳。

我在这里和这里读到,可以使用multiprocessing.Array将共享内存中的多个numpy数组保存在共享内存中。 但是,我仍然需要共享整个ArrayDict ,其中特别包含一个字典,这反过来又是不可取的。

我怎样才能以有效的方式实现我的目标? 以某种方式让我的对象被选中是否可能(并且有效)?

所有的解决方案都必须在python 3上运行,并且必须在64bit Linux上运行numpy / scipy。

编辑

我在这里发现,使用Multiprocessing“Manager”类和用户定义的代理类可以共享任意对象。 这会有效吗? 我想利用我不需要并发访问对象,即使它们不在主进程中处理。 为我想要处理的每个对象创建一个管理器会是一个选项吗? (我对管理者的工作方式可能仍存在一些误解。)


这看起来像一个相当复杂的类,我不能完全预测此解决方案是否适用于您的情况。 对这样一个复杂类的简单折衷就是使用ProcessPoolExecutor

如果这不能回答你的问题,那么以最小的工作为例,这将是一件好事。

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)

对象在发送到子进程时被克隆,您需要返回结果(因为对对象的更改不会传播回主进程)并处理您想要如何存储它们。

请注意,对类变量的更改会传播到同一进程中的其他对象,例如,如果您有比进程多的任务,则将在同一进程中运行的实例之间共享对类变量的更改。 这通常是不受欢迎的行为。

这是并行化的高级接口。 ProcessPoolExecutor使用multiprocessing模块,只能与可选对象一起使用。 我怀疑ProcessPoolExecutor性能类似于“在进程之间共享状态”。 在底层, ProcessPoolExecutor正在使用multiprocessing.Process ,并且应该表现出与Pool相似的性能(除了在地图上使用非常长的迭代器时)。 ProcessPoolExecutor似乎是Python中并发任务的未来API。

如果可以的话,使用ThreadPoolExecutor (可以仅为ProcessPoolExecutor进行交换)通常会更快。 在这种情况下,对象在进程之间共享,并且更新到一个将传播回主线程​​。

如上所述,最快的选择可能是重新构造ArrayDict以便它只使用可由multiprocessing.ValueArray表示的对象。

如果ProcessPoolExecutor不起作用,并且无法优化ArrayDict ,则可能会ArrayDict使用Manager 。 这里有很好的例子说明如何做到这一点。

myMethod可能会找到最大的性能提升。 而且,正如我所提到的,使用线程的开销要小于进程的开销。

链接地址: http://www.djcxy.com/p/37609.html

上一篇: Altering different python objects in parallel processes, respectively

下一篇: embed purchase funnel from a website in Ionic app