passing kwargs with multiprocessing.pool.map
I would like to pass keyword arguments to my worker-function with Pool.map(). I can't find a clear example of this when searching forums.
Example Code:
import multiprocessing as mp
def worker((x,y), **kwargs):
kwarg_test = kwargs.get('kwarg_test', False)
print("kwarg_test = {}".format(kwarg_test))
if kwarg_test:
print("Success")
return x*y
def wrapper_process(**kwargs):
jobs = []
pool=mp.Pool(4)
for i, n in enumerate(range(4)):
jobs.append((n,i))
pool.map(worker, jobs) #works
pool.map(worker, jobs, kwargs) #how to do this?
def main(**kwargs):
worker((1,2),kwarg_test=True) #accepts kwargs
wrapper_process(kwarg_test=True)
if __name__ == "__main__":
main()
Output:
kwarg_test = True
Success
kwarg_test = False
kwarg_test = False
kwarg_test = False
kwarg_test = False
TypeError: unsupported operand type(s) for //: 'int' and 'dict'
The type error has to do with parsing arguments inside of multiprocessing.Pool or Queue, and I have tried several other syntaxes, like making a list of the kwargs; [kwargs, kwargs, kwargs, kwargs], as well as several attempts to include the kwarg in the jobs list but no luck. I traced the code in multiprocessing.pool from map to map_async and got as far as task_batches = Pool._get_tasks(func, iterable, chunksize)
in pool.py when I encountered the generator structure. I'm happy to learn more about this in future but for now I am just trying to find out:
Is there a simple syntax for allowing the passing of kwargs with pool.map?
If you want to iterate over the other arguments, use @ArcturusB's answer.
If you just want to pass them, having the same value for each iteration, then you can do this:
from functools import partial
pool.map(partial(worker, **kwargs), jobs)
Partial 'binds' arguments to a function. Old versions of Python cannot serialize partial objects though.
The multiprocessing.pool.Pool.map
doc states:
A parallel equivalent of the map() built-in function ( it supports only one iterable argument though ). It blocks until the result is ready.
We can only pass one iterable argument. End of the story. But we can luckilly think of a workaround: define worker_wrapper
function that takes a single argument, unpacks it to args and kwargs, and passes them to worker
:
def worker_wrapper(arg):
args, kwargs = arg
return worker(*args, **kwargs)
In your wrapper_process
, you need to construct this single argument from jobs
(or even directly when constructing jobs) and call worker_wrapper
:
arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)
Here is a working implementation, kept as close as possible to your original code:
import multiprocessing as mp
def worker_wrapper(arg):
args, kwargs = arg
return worker(*args, **kwargs)
def worker(x, y, **kwargs):
kwarg_test = kwargs.get('kwarg_test', False)
# print("kwarg_test = {}".format(kwarg_test))
if kwarg_test:
print("Success")
else:
print("Fail")
return x*y
def wrapper_process(**kwargs):
jobs = []
pool=mp.Pool(4)
for i, n in enumerate(range(4)):
jobs.append((n,i))
arg = [(j, kwargs) for j in jobs]
pool.map(worker_wrapper, arg)
def main(**kwargs):
print("=> calling `worker`")
worker(1, 2,kwarg_test=True) #accepts kwargs
print("=> no kwargs")
wrapper_process() # no kwargs
print("=> with `kwar_test=True`")
wrapper_process(kwarg_test=True)
if __name__ == "__main__":
main()
Which passes the test:
=> calling `worker`
Success
=> no kwargs
Fail
Fail
Fail
Fail
=> with `kwar_test=True`
Success
Success
Success
Success
链接地址: http://www.djcxy.com/p/18182.html
上一篇: 在活动的Docker容器上公开端口