python多处理.Pool kill *特定*长时间运行或挂起进程

我需要执行许多并行数据库连接和查询池。 我想使用一个multiprocessing.Pool或concurrent.futures ProcessPoolExecutor。 Python 2.7.5

在某些情况下,查询请求会花费太长时间或永远不会结束(挂起/僵尸进程)。 我想杀死已经超时的multiprocessing.Pool或concurrent.futures ProcessPoolExecutor的具体过程。

下面是如何杀死/重新生成整个进程池的示例,但理想情况下,我会尽量减少CPU抖动,因为我只想杀死一个特定的长时间运行的进程,该进程在超时秒后没有返回数据。

出于某种原因,下面的代码似乎无法在所有结果返回并完成后终止/加入进程池。 它可能与超时发生时杀死工作进程有关,但是当池被杀死并且结果如预期时,池创建新的工作者。

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    while results:
        try:
            x, result = results.pop(0)
            start = time.time()
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)

        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for p in pool._pool:
                if p.exitcode is None:
                    p.terminate()

    pool.terminate()
    pool.join()

我不完全了解你的问题。 你说你想停止一个特定的进程,但是,然后,在你的异常处理阶段,你打电话终止所有工作。 不知道你为什么这样做。 另外,我很确定使用来自multiprocessing.Pool内部变量multiprocessing.Pool不是很安全。 说完所有这些,我认为你的问题是为什么当超时发生时这个程序没有完成。 如果这是问题,那么下面的诀窍就是:

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    result = None
    start = time.time()
    while results:
        try:
            x, result = results.pop(0)
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for i in reversed(range(len(pool._pool))):
                p = pool._pool[i]
                if p.exitcode is None:
                    p.terminate()
                del pool._pool[i]

    pool.terminate()
    pool.join()

关键是你需要从池中删除项目; 只是打电话给他们终止是不够的。


在你的解决方案中,你正在篡改池本身的内部变量。 该池依靠3个不同的线程才能正确操作,因此干预内部变量并不真正意识到自己在做什么是不安全的。

没有一种干净的方法可以停止标准Python池中的超时进程,但是还有其他可以实现这些功能的实现。

你可以看看下面的库:

卵石

台球


为了避免访问内部变量,可以将执行任务中的multiprocessing.current_process().pid保存到共享内存中。 然后遍历主进程中的multiprocessing.active_children() ,如果存在,则pid目标pid
然而,在工人外部终止之后,他们被重新创建,但是该池变得不可加入,并且还需要在join()之前明确终止 join()

链接地址: http://www.djcxy.com/p/75551.html

上一篇: python multiprocessing.Pool kill *specific* long running or hung process

下一篇: Why does libgcc use global offset table?