Python:在熊猫数据框上使用多处理
我想在大数据集上使用multiprocessing
来查找两个gps点之间的距离。 我构建了一个测试集,但是我一直无法获得multiprocessing
来处理这个集合。
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
在发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2以及Anaconda 2.5.0 64位。
runfile('C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')Traceback(最近一次调用最后一次):
文件“”,第1行,在runfile中('C:/.../ Desktop / multiprocessing test.py',wdir ='C:/.../ Desktop')
文件“C:... Local Continuum Anaconda2 lib site-packages spyderlib widgets externalshell sitecustomize.py”,第699行,在runfile execfile(filename,namespace)
文件“C:... Local Continuum Anaconda2 lib site-packages spyderlib widgets externalshell sitecustomize.py”,第74行,在execfile exec(compile(scripttext,filename,'exec'),glob ,loc)
在pool.map(calc_dist,['lat','lon'])中的文件“C:/..../ multiprocessing test.py”,第33行
在map中返回文件“C:... AppData Local Continuum Anaconda2 lib multiprocessing pool.py”,第251行返回self.map_async(func,iterable,chunksize).get()
文件“C:... Local Continuum Anaconda2 lib multiprocessing pool.py”,第567行,在get raise self._value
TypeError:无法从1创建Point实例。
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
怎么了
代码中的这一行:
pool.map(calc_dist, ['lat','lon'])
产生2个进程 - 一个运行calc_dist('lat')
,另一个运行calc_dist('lon')
。 比较文档中的第一个示例。 (基本上, pool.map(f, [1,2,3])
使用列表中给出的参数调用f
三次: f(1)
, f(2)
和f(3)
。)如果I' m没有错,你的函数calc_dist
只能被称为calc_dist('lat', 'lon')
。 而且它不允许并行处理。
解
我相信你希望在进程之间分离工作,可能会将每个元组(grp, lst)
发送到一个单独的进程。 下面的代码完全一样。
首先,让我们准备分裂:
grp_lst_args = list(df.groupby('co_nm').groups.items())
print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
我们将把这些元组中的每一个(这里有三个)作为参数发送给一个独立进程中的函数。 我们需要重写函数,我们称之为calc_dist2
。 为方便起见,它的参数是一个元组,如calc_dist2(('aa',[0,1,2]))
def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], ['lat','lon']],
df.loc[c[1], ['lat','lon']])
]
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
现在是多处理:
pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results
是调用的结果(此数据帧)的列表calc_dist2((grp,lst))
为(grp,lst)
的grp_lst_args
。 results
元素稍后连接到一个数据框。
print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km
顺便说一句,在Python 3中,我们可以使用with
建设:
with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)
更新
我只在linux上测试过这个代码。 在linux上,只读数据帧df
可以df
进程访问,并且不会被复制到它们的内存空间,但我不确定它是如何在Windows上正常工作的。 您可以考虑将df
分成块(按co_nm
分组),并将这些块作为参数发送给某些其他版本的calc_dist
。
奇怪。 它似乎在python2下工作,但不是python3。
这是打印输出的最小修改版本:
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
ret = pd.DataFrame(
[ [grp,
df.loc[c[0]].ser_no,
df.loc[c[1]].ser_no,
vincenty(df.loc[c[0], x],
df.loc[c[1], x])
]
for grp,lst in df.groupby('co_nm').groups.items()
for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
print(ret)
return ret
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
这是python2的输出
0 aa 1 2 110.723608682 km
1 aa 1 3 221.460709525 km
2 aa 2 3 110.737100843 km
3 cc 8 9 110.827576495 km
4 cc 8 0 221.671650552 km
co_nm machineA machineB distance
5 cc 9 0 110.844074057 km
0 aa 1 2 110.575064814 km
1 aa 1 3 221.151481337 km
6 bb 4 5 110.765515243 km
2 aa 2 3 110.576416524 km
7 bb 4 6 221.5459187 km
3 cc 8 9 110.598565514 km
4 cc 8 0 221.203121352 km
8 bb 4 7 332.341640771 km
5 cc 9 0 110.604555838 km
6 bb 4 5 110.58113908 km
9 bb 5 6 110.780403457 km
7 bb 4 6 221.165643396 km
10 bb 5 7 221.576125528 km
8 bb 4 7 331.754177186 km
9 bb 5 6 110.584504316 km
10 bb 5 7 221.173038106 km
11 bb 6 7 110.795722071 km
11 bb 6 7 110.58853379 km
这是从python3的堆栈跟踪
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 123, in __new__
seq = iter(arg)
TypeError: 'numpy.int64' object is not iterable
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "gps.py", line 29, in calc_dist
for grp, lst in df.groupby('co_nm').groups.items()
File "gps.py", line 30, in <listcomp>
for c in combinations(lst, 2)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 322, in __init__
super(vincenty, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 115, in __init__
kilometers += self.measure(a, b)
File "/usr/local/lib/python3.4/dist-packages/geopy/distance.py", line 342, in measure
a, b = Point(a), Point(b)
File "/usr/local/lib/python3.4/dist-packages/geopy/point.py", line 126, in __new__
"Failed to create Point instance from %r." % (arg,)
TypeError: Failed to create Point instance from 8.
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "gps.py", line 38, in <module>
pool.map(calc_dist, ['lat', 'lon'])
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
TypeError: Failed to create Point instance from 8.
我知道这不是答案,但也许它有帮助...
链接地址: http://www.djcxy.com/p/9419.html