用基于生成器的协程看似无限递归

以下内容来自David Beazley关于发电机的幻灯片(这里是对任何感兴趣的人)。

定义了一个Task类,它包装一个产生期货的生成器,即Task类,完全(没有错误处理),如下所示:

class Task:
    def __init__(self, gen):
        self._gen = gen

    def step(self, value=None):
        try:
            fut = self._gen.send(value)
            fut.add_done_callback(self._wakeup)
        except StopIteration as exc:
            pass

    def _wakeup(self, fut):
        result = fut.result()
        self.step(result)

在一个例子中,还定义了下面的递归函数:

from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(max_workers=8)

def recursive(n):
   yield pool.submit(time.sleep, 0.001)
   print("Tick :", n)
   Task(recursive(n+1)).step()

以下两种情况会发生:

  • 从Python REPL中,如果我们定义这些(或者如果我们将它们放在一个文件中,则导入它们),然后用下面的代码跳转启动递归:

    Task(recursive(0)).step()
    

    它开始打印,看起来已经超过了递归极限。 它显然没有超过它,但是打印堆栈级别表明它在整个执行过程中保持不变。 还有其他事情正在发生,我不太明白。

    注意 :如果你像这样执行它,你需要杀死python进程。

  • 如果我们将所有内容( Taskrecursive )放在一个文件中,并附带:

    if __name__ == "__main__":
        Task(recursive(0)).step()
    

    然后用python myfile.py运行它,它停止在7 (看起来是max_workers的数量)。


  • 我的问题是,它是如何超越递归限制的,以及它如何根据执行方式的不同而有所不同?

    这个行为出现在Python 3.6.2和Python 3.5.4上(我猜也会在3.63.5系列中的其他人)。


    你显示的recursive生成器实际上并不是递归的,这会导致系统递归限制的问题。

    理解为什么当recursive生成器的代码运行时需要注意。 与普通函数不同,只调用recursive(0)不会导致它立即运行其代码并进行额外的递归调用。 相反,调用recursive(0)立即返回一个生成器对象。 只有当你send()到生成器时,代码才会运行,并且只有在你第二次send()它之后才会启动另一个调用。

    让我们在代码运行时检查调用堆栈。 在顶层,我们运行Task(recursive(0)).step() 。 这在一个序列中做了三件事:

  • recursive(0)这个调用立即返回一个生成器对象。
  • Task(_) Task对象已创建,其__init__方法存储对第一步中创建的生成器对象的引用。
  • _.step()调用该任务的方法。 这是行动真正开始的地方! 我们来看看调用过程中会发生什么:

  • fut = self._gen.send(value)在这里我们通过发送一个值来实际启动发生器运行。 让我们深入一下,看看生成器代码的运行:
  • yield pool.submit(time.sleep, 0.001)这将安排在另一个线程中完成的任务。 我们不等待它发生。 相反,我们会得到一个Future ,我们可以使用它在完成时收到通知。 我们立即回到以前的代码水平。
  • fut.add_done_callback(self._wakeup)这里我们要求在未来准备就绪时调用_wakeup()方法。 这总是立即返回!
  • step方法现在结束。 没错,我们完成了(暂时)! 这对你问题的第二部分很重要,我将在稍后讨论。
  • 我们所做的调用结束了,所以如果我们交互式运行,控制流将返回到REPL。 如果我们以脚本的形式运行,解释器会到达脚本的末尾并开始关闭(我将在下面进一步讨论)。 然而,由线程池控制的其他线程仍在运行,并且在某个时刻,其中一个线程将会执行一些我们关心的事情! 让我们看看那是什么。

  • 当预定的函数( time.sleep )完成运行时,它所运行的线程将调用我们在Future对象上设置的回调函数。 也就是说,它会在我们之前创建的Task对象上调用Task._wakup() (我们在顶层没有引用,但Future保留了引用,因此它仍然存在)。 让我们看看这个方法:

  • result = fut.result()存储延迟调用的结果。 这在这种情况下是无关紧要的,因为我们从来没有看过结果(它无论如何None )。
  • self.step(result)再一次! 现在我们回到我们关心的代码。 让我们来看看它这次的作用:
  • fut = self._gen.send(value)再次发送到生成器,因此它接管。 它已经放弃了一次,所以这次我们刚刚开始yield
  • print("Tick :", n)这很简单。
  • Task(recursive(n+1)).step()这是事情变得有趣的地方。 这条线就像我们开始的那样。 所以,像以前一样,这将运行上面列出的逻辑1-4(包括他们的子步骤)。 但是,如果step()方法返回,它不会返回到REPL或结束脚本,而是返回到此处。
  • recursive()生成器(原来的,而不是我们刚刚创建的新生成器)已经到了最后。 因此,就像任何到达代码末尾的生成器一样,它会引发StopIteration
  • StopIterationtry / except块捕获并忽略,并且step()方法结束。
  • _wakup()方法也结束了,所以回调完成了。
  • 最终还会调用在早期回调中创建的Task的回调。 所以我们回去重复第5步,一遍又一遍,永远(如果我们是交互式运行的话)。
  • 上面的调用堆栈解释了为什么交互式外壳永远打印。 主线程返回到REPL(如果可以看到其他线程的输出,则可以使用它来做其他事情)。 但在游泳池中,每个线程都会从自己的工作回调中安排另一份工作。 当下一份工作完成时,它的回调会安排另一份工作,等等。

    那么,当你将代码作为脚本运行时,为什么只有8个打印输出呢? 答案在上面的步骤4中暗示。 当以非交互方式运行时,在第一次调用Task.step返回后,主线程会在脚本末尾运行。 这会提示口译员尝试关闭。

    concurrent.futures.thread模块(其中定义了ThreadPoolExecutor )具有一些花哨的逻辑,当执行程序仍处于活动状态时,程序关闭时会尝试清理干净。 它应该停止任何空闲线程,并发出任何仍在运行的信号,以便在当前作业完成时停止。

    清理逻辑的确切实现以非常奇怪的方式与我们的代码交互(这可能会或可能不会有问题)。 结果是第一个线程不断给自己做更多的工作,而其他生成的工作线程在产生后立即退出。 第一个工作者最终退出时,执行者已经开始使用许多线程(在我们的例子中为8)。

    据我了解,这是一系列事件。

  • 我们导入(间接) concurrent.futures.thread模块,它使用atexit来告诉解释器在解释器关闭之前运行一个名为_python_exit的函数。
  • 我们创建一个最大线程数为8的ThreadPoolExecutor 。它不会立即产生其工作线程,但会在每次安排作业时创建一个,直到它具有全部8个。
  • 我们安排我们的第一份工作(在上一份清单中第3步的深度嵌套部分)。
  • 执行程序将作业添加到其内部队列中,然后通知它没有最大数量的工作线程并启动一个新线程。
  • 新线程将作业从队列中弹出并开始运行。 但是, sleep呼叫比其余步骤花费的时间要长得多,所以线程将在此停留一段时间。
  • 主线程完成(已到达上一个列表中的第4步)。
  • _python_exit函数被解释器调用,因为解释器想要关闭。 该功能设置全局_shutdown模块中的变量,并发送一个None执行人的内部队列(它发送一个None每个线程,但只是到目前为止创建一个线程,所以它只是发送一个None )。 然后它阻塞主线程,直到它知道的线程退出。 这会延误解释器的关闭。
  • 工作线程调用time.sleep返回。 它调用在作业的Future注册的回调函数,该函数调度另一个作业。
  • 就像在这个列表的第4步中一样,执行程序排队工作,并启动另一个线程,因为它还没有所需的编号。
  • 新线程尝试从内部队列中取出一个作业,但从步骤7获取None值,这是可能完成的信号。 它看到_shutdown全球已经确定,所以它就退出了。 但在它之前,它会向队列中添加另一个“ None ”。
  • 第一个工作线程完成其回调。 它寻找一份新工作,并在步骤8中找到它自己排队的一份工作。它开始运行工作,并且像步骤5中那样需要一段时间。
  • 因为第一个工作人员是目前唯一活动的线程(主线程被阻塞,等待第一个工人死亡,另一个工人关闭),但没有其他任何事情发生。
  • 我们现在重复步骤8-12几次。 第一个工作线程排队第三到第八个作业,并且执行程序每次都会产生相应的线程,因为它没有完整的集合。 但是,每个线程立即死亡,因为它从作业队列中取得了一个None ,而不是完成一个实际的作业。 第一个工作线程结束了所有的实际工作。
  • 最后,在第八份工作之后,某种工作方式会有所不同。 这一次,当回调调度另一个作业时,不会产生额外的线程,因为执行程序知道它已经启动了所请求的8个线程(它不知道7已经关闭)。
  • 所以这一次,第一个工作人员(而不是实际的工作)会将内部工作队列头的None取走。 这意味着它关闭,而不是做更多的工作。
  • 当第一个工作人员关闭时,主线程(一直等待它退出)最终可以解除阻塞并且_python_exit函数完成。 这让翻译完全关闭。 我们完成了!
  • 这解释了我们看到的输出! 我们得到8个输出,全部来自相同的工作线程(第一个产生)。

    我认为在那个代码中可能存在竞争条件。 如果步骤11在步骤10之前发生,事情可能会中断。 如果第一个工人None排队,另一个新产生的工人得到了真正的工作,那么交换的角色(第一个工人会死亡,另一个工人会完成剩下的工作,除去更多的竞争条件这些步骤的更高版本)。 但是,一旦第一名工人死亡,主线程就会被解除阻塞。 由于它不知道其他线程(因为当它使线程的列表等待时它们不存在),它会过早地关闭解释器。

    我不确定这场比赛是否有可能发生。 我猜这是不太可能的,因为新线程开始和它从队列中获取作业之间的代码路径的长度比现有线程完成回调的路径短得多(排队后的部分新工作),然后在队列中寻找另一份工作。

    我怀疑这是一个ThreadPoolExecutor让我们在将代码作为脚本运行时干净地退出的错误。 排队新作业的逻辑除了执行程序自己的self._shutdown属性以外,还应该检查全局_shutdown标志。 如果是这样,在主线程完成后尝试排队另一个作业会引发异常。

    通过在with语句中创建ThreadPoolExecutor ,您可以复制我认为更为理智的行为:

    # create the pool below the definition of recursive()
    with ThreadPoolExecutor(max_workers=8) as pool:
        Task(recursive(0)).step()
    

    在主线程从step()调用返回后,这将很快崩溃。 它看起来像这样:

    exception calling callback for <Future at 0x22313bd2a20 state=finished returned NoneType>
    Traceback (most recent call last):
      File "S:python36libconcurrentfutures_base.py", line 324, in _invoke_callbacks
        callback(self)
      File ".task_coroutines.py", line 21, in _wakeup
        self.step(result)
      File ".task_coroutines.py", line 14, in step
        fut = self._gen.send(value)
      File ".task_coroutines.py", line 30, in recursive
        Task(recursive(n+1)).step()
      File ".task_coroutines.py", line 14, in step
        fut = self._gen.send(value)
      File ".task_coroutines.py", line 28, in recursive
        yield pool.submit(time.sleep, 1)
      File "S:python36libconcurrentfuturesthread.py", line 117, in submit
        raise RuntimeError('cannot schedule new futures after shutdown')
    RuntimeError: cannot schedule new futures after shutdown
    

    让我们从7号开始吧。 这就是你已经提到过的工人数量,标记从[0..7]Task类需要通过recursive形式传递函数标识符。

    Task(recursive).step(n) 
    

    代替

    Task(recursive(n)).step()
    

    这是因为, 递归函数需要在pool环境中recursive而在当前情况下, recursive是在主线程本身中计算的。 time.sleep是当前代码中在任务池中评估的唯一函数。

    代码存在主要问题的关键方面是递归。 池中的每个线程都依赖于内部函数,将执行上限限制为可用工作者的数量。 该功能无法完成,因此新的功能无法执行。 因此,它在达到递归极限之前终止。

    链接地址: http://www.djcxy.com/p/53245.html

    上一篇: Seemingly infinite recursion with generator based coroutines

    下一篇: "async with" in Python 3.4