分别在并行进程中更改不同的python对象
简而言之
我想同时更改复杂的python对象,每个对象只能由一个进程处理。 我怎样才能做到这一点(效率最高)? 实施某种酸洗支持会有帮助吗? 这会有效吗?
充分的问题
我有一个Python数据结构ArrayDict
,基本上由一个numpy
数组和一个字典组成,并将任意索引映射到数组中的行。 在我的情况下,所有的键都是整数。
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
现在我有多个这样的ArrayDict
并希望将它们填充到myMethod(arrayDict, params)
。 由于myMethod
价格昂贵,我想并行运行它。 请注意, myMethod
可能会将许多行添加到arrayDict
。 每个进程都会改变它自己的ArrayDict
。 我不需要并发访问ArrayDict
。
在myMethod
,我更改了arrayDict
中的arrayDict
(即更改了内部numpy
数组),我向arrayDict
添加了条目( arrayDict
字典中添加了另一个索引并在内部数组中写入一个新值)。 最后,我希望能够在arrayDict
的内部numpy
数组变得太小时交换数组。 这不会经常发生,如果没有更好的解决方案,我可以在我的程序的非平行部分执行此操作。 即使没有阵列交换,我自己的尝试也不会成功。
我花了几天时间研究共享内存和python的多处理模块。 由于我最终会在linux上工作,所以这个任务似乎很简单:系统调用fork()
允许有效地处理参数的副本。 然后,我的想法是在其自己的进程中更改每个ArrayDict
,返回对象的更改版本,并覆盖原始对象。 为了节省内存并保存复制工作,我还使用了sharedmem数组来存储ArrayDict
的数据。 我知道字典必须被复制。
from sharedmem import sharedmem
import numpy as np
n = ... # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False
while not done:
consideredData = ... # numpy boolean array of length
# n with True at the index of
# considered data
args = ... # numpy array containing arguments
# for myMethod
with sharedmem.MapReduce() as pool:
results = pool.map(myMethod,
list(zip(myData[considered],
args[considered])),
star=True)
myData[considered] = results
done = ... # depends on what happens in
# myMethod
我得到的是分段错误错误。 我能够通过创建ArrayDict对myMethod
的ArrayDict
并将它们保存到myData
来绕过这个错误。 我不明白为什么这是必要的,并且经常复制我的(可能非常大的)数组(while循环花费很长时间)对我来说似乎并不高效。 但是,至少它在一定程度上起作用。 不过,由于共享内存的原因,我的程序在第3次迭代时出现了一些错误行为。 因此,我认为我的方式并不是最佳。
我在这里和这里读到,可以使用multiprocessing.Array
将共享内存中的多个numpy数组保存在共享内存中。 但是,我仍然需要共享整个ArrayDict
,其中特别包含一个字典,这反过来又是不可取的。
我怎样才能以有效的方式实现我的目标? 以某种方式让我的对象被选中是否可能(并且有效)?
所有的解决方案都必须在python 3上运行,并且必须在64bit Linux上运行numpy / scipy。
编辑
我在这里发现,使用Multiprocessing“Manager”类和用户定义的代理类可以共享任意对象。 这会有效吗? 我想利用我不需要并发访问对象,即使它们不在主进程中处理。 为我想要处理的每个对象创建一个管理器会是一个选项吗? (我对管理者的工作方式可能仍存在一些误解。)
这看起来像一个相当复杂的类,我不能完全预测此解决方案是否适用于您的情况。 对这样一个复杂类的简单折衷就是使用ProcessPoolExecutor
。
如果这不能回答你的问题,那么以最小的工作为例,这将是一件好事。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
class ArrayDict ():
keys = None
vals = None
def __init__ (self):
self.keys = dict ()
self.vals = np.random.rand (1000)
def __str__ (self):
return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())
def myMethod (ad, args):
print ("starting:", ad)
if __name__ == '__main__':
l = [ArrayDict() for _ in range (5)]
args = [2, 3, 4, 1, 3]
with ProcessPoolExecutor (max_workers = 2) as ex:
d = ex.map (myMethod, l, args)
对象在发送到子进程时被克隆,您需要返回结果(因为对对象的更改不会传播回主进程)并处理您想要如何存储它们。
请注意,对类变量的更改会传播到同一进程中的其他对象,例如,如果您有比进程多的任务,则将在同一进程中运行的实例之间共享对类变量的更改。 这通常是不受欢迎的行为。
这是并行化的高级接口。 ProcessPoolExecutor
使用multiprocessing
模块,只能与可选对象一起使用。 我怀疑ProcessPoolExecutor
性能类似于“在进程之间共享状态”。 在底层, ProcessPoolExecutor
正在使用multiprocessing.Process
,并且应该表现出与Pool
相似的性能(除了在地图上使用非常长的迭代器时)。 ProcessPoolExecutor
似乎是Python中并发任务的未来API。
如果可以的话,使用ThreadPoolExecutor
(可以仅为ProcessPoolExecutor
进行交换)通常会更快。 在这种情况下,对象在进程之间共享,并且更新到一个将传播回主线程。
如上所述,最快的选择可能是重新构造ArrayDict
以便它只使用可由multiprocessing.Value
或Array
表示的对象。
如果ProcessPoolExecutor
不起作用,并且无法优化ArrayDict
,则可能会ArrayDict
使用Manager
。 这里有很好的例子说明如何做到这一点。
myMethod
可能会找到最大的性能提升。 而且,正如我所提到的,使用线程的开销要小于进程的开销。
上一篇: Altering different python objects in parallel processes, respectively