在Tornado中运行异步后台任务

阅读Tornado文档,非常清楚如何调用异步函数来返回响应:

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

缺乏的是如何调用一个与当前请求无关的后台任务:

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def _background_task():
        pass  # do lots of background stuff

    @gen.coroutine
    def get(self):
        _dont_care = yield self._background_task()
        self.render("template.html")

这段代码应该可以工作,除了它同步运行并且请求等待它完成之前。

异步调用此任务的正确方法是什么,同时立即返回当前请求?


不幸的是它有点棘手。 您需要将后台任务从当前请求中分离出来(这样后台任务中的失败并不会导致抛出请求的随机异常)并确保某些内容正在侦听后台任务的结果(记录其错误如果没有别的)。 这意味着这样的事情:

from tornado.ioloop import IOLoop
from tornado.stack_context import run_in_stack_context, NullContext
IOLoop.current().add_future(run_in_stack_context(NullContext(), self._background_task),
                            lambda f: f.result())

类似这样的东西可能会在未来加入龙卷风本身。

更新:自Tornado 4.0以来,上述功能在IOLoop.spawn_callback方法中可用。


我建议使用toro。 它提供了一个相对简单的机制来设置任务的后台队列。

下面的代码(例如放在queue.py中)启动一个简单的“worker()”,它只是等待,直到队列中有某些东西。 如果调用queue.add(function,async,*args,**kwargs) ,则会向队列中添加一个项目,以唤醒worker(),然后启动该任务。

我添加了async参数,以便它可以支持包装在@ gen.coroutine中的后台任务以及不包含这些任务的后台任务。

import toro,tornado.gen
queue = toro.Queue()
@tornado.gen.coroutine
def add(function,async,*args,**kwargs):
   item = dict(function=function,async=async,args=args,kwargs=kwargs)
   yield queue.put(item)

@tornado.gen.coroutine
def worker():
   while True:
      print("worker() sleeping until I get next item")
      item = yield queue.get()
      print("worker() waking up to process: %s" % item)
      try:
         if item['async']:
            yield item['function'](*item['args'],**item['kwargs'])
         else:
            item['function'](*item['args'],**item['kwargs'])
      except Exception as e:
         print("worker() failed to run item: %s, received exception:n%s" % (item,e))

@tornado.gen.coroutine
def start():
   yield worker()

在你的主龙卷风应用程序中:

import queue
queue.start()

现在你可以很简单地安排一个背景任务:

def my_func(arg1,somekwarg=None):
   print("in my_func() with %s %s" % (arg1,somekwarg))

queue.add(my_func,False,somearg,somekwarg=someval)

我有一个耗时的任务请求,可能需要超过30分钟,但客户需要立即返回结果。

首先,我使用了IOLoop.current()。spawn_callback。 有用! 但! 如果第一个请求任务正在运行,则第二个请求任务被阻止! 由于所有任务都在主事件循环中使用spawn_callback时,因此一个任务是同步执行,其他任务被阻塞。

最后,我使用tornado.concurrent。 例:

import datetime
import time

from tornado.ioloop import IOLoop
import tornado.web
from tornado import concurrent

executor = concurrent.futures.ThreadPoolExecutor(8)


class Handler(tornado.web.RequestHandler):

    def get(self):
        def task(arg):
            for i in range(10):
                time.sleep(1)
                print(arg, i)

        executor.submit(task, datetime.datetime.now())
        self.write('request accepted')


def make_app():
    return tornado.web.Application([
        (r"/", Handler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8000, '0.0.0.0')
    IOLoop.current().start()

并访问http://127.0.0.1:8000,你可以看到它运行正常:

2017-01-17 22:42:10.983632 0
2017-01-17 22:42:10.983632 1
2017-01-17 22:42:10.983632 2
2017-01-17 22:42:13.710145 0
2017-01-17 22:42:10.983632 3
2017-01-17 22:42:13.710145 1
2017-01-17 22:42:10.983632 4
2017-01-17 22:42:13.710145 2
2017-01-17 22:42:10.983632 5
2017-01-17 22:42:16.694966 0
2017-01-17 22:42:13.710145 3
2017-01-17 22:42:10.983632 6
2017-01-17 22:42:16.694966 1
2017-01-17 22:42:13.710145 4
2017-01-17 22:42:10.983632 7
2017-01-17 22:42:16.694966 2
2017-01-17 22:42:13.710145 5
2017-01-17 22:42:10.983632 8
2017-01-17 22:42:16.694966 3
2017-01-17 22:42:13.710145 6
2017-01-17 22:42:19.790646 0
2017-01-17 22:42:10.983632 9
2017-01-17 22:42:16.694966 4
2017-01-17 22:42:13.710145 7
2017-01-17 22:42:19.790646 1
2017-01-17 22:42:16.694966 5
2017-01-17 22:42:13.710145 8
2017-01-17 22:42:19.790646 2
2017-01-17 22:42:16.694966 6
2017-01-17 22:42:13.710145 9
2017-01-17 22:42:19.790646 3
2017-01-17 22:42:16.694966 7
2017-01-17 22:42:19.790646 4
2017-01-17 22:42:16.694966 8
2017-01-17 22:42:19.790646 5
2017-01-17 22:42:16.694966 9
2017-01-17 22:42:19.790646 6
2017-01-17 22:42:19.790646 7
2017-01-17 22:42:19.790646 8
2017-01-17 22:42:19.790646 9

想要帮助每个人!

链接地址: http://www.djcxy.com/p/27851.html

上一篇: Running an async background task in Tornado

下一篇: Tornado streaming HTTP response as AsyncHTTPClient receives chunks