Running an async background task in Tornado

Reading the Tornado documentation, it's very clear how to call an async function to return a response:

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

What's lacking is how should a call be made asynchronously to a background task that has no relevance to the current request:

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def _background_task():
        pass  # do lots of background stuff

    @gen.coroutine
    def get(self):
        _dont_care = yield self._background_task()
        self.render("template.html")

This code would be expected to work, except that it runs synchronously and the request waits on it until it's finished.

What is the right way to asynchronously call this task, while immediately returning the current request?


Unfortunately it's kind of tricky. You need to both detach the background task from the current request (so that a failure in the background task doesn't result in a random exception thrown into the request) and ensure that something is listening to the background task's result (to log its errors if nothing else). This means something like this:

from tornado.ioloop import IOLoop
from tornado.stack_context import run_in_stack_context, NullContext
IOLoop.current().add_future(run_in_stack_context(NullContext(), self._background_task),
                            lambda f: f.result())

Something like this will probably be added to tornado itself in the future.

Update: Since Tornado 4.0, the above functionality is available in the IOLoop.spawn_callback method.


I recommend using toro. It provides a relatively simple mechanism for setting up a background queue of tasks.

The following code (put in queue.py for example), starts a simple "worker()" that simply waits until there is something in his queue. If you call queue.add(function,async,*args,**kwargs) this adds an item to the queue which will wake up worker() which then kicks off the task.

I added the async parameter so that this can support background tasks wrapped in @gen.coroutine and those without.

import toro,tornado.gen
queue = toro.Queue()
@tornado.gen.coroutine
def add(function,async,*args,**kwargs):
   item = dict(function=function,async=async,args=args,kwargs=kwargs)
   yield queue.put(item)

@tornado.gen.coroutine
def worker():
   while True:
      print("worker() sleeping until I get next item")
      item = yield queue.get()
      print("worker() waking up to process: %s" % item)
      try:
         if item['async']:
            yield item['function'](*item['args'],**item['kwargs'])
         else:
            item['function'](*item['args'],**item['kwargs'])
      except Exception as e:
         print("worker() failed to run item: %s, received exception:n%s" % (item,e))

@tornado.gen.coroutine
def start():
   yield worker()

In your main tornado app:

import queue
queue.start()

And now you can schedule a back ground task quite simply:

def my_func(arg1,somekwarg=None):
   print("in my_func() with %s %s" % (arg1,somekwarg))

queue.add(my_func,False,somearg,somekwarg=someval)

I have a time-consuming task in post request, maybe more than 30 minutes need, but client required return a result immediately.

First, I used IOLoop.current().spawn_callback. It works! but! If the first request task is running, second request task blocked! Because all tasks are in main event loop when use spawn_callback, so one task is synchronous execution, other tasks blocked.

Last, I use tornado.concurrent. Example:

import datetime
import time

from tornado.ioloop import IOLoop
import tornado.web
from tornado import concurrent

executor = concurrent.futures.ThreadPoolExecutor(8)


class Handler(tornado.web.RequestHandler):

    def get(self):
        def task(arg):
            for i in range(10):
                time.sleep(1)
                print(arg, i)

        executor.submit(task, datetime.datetime.now())
        self.write('request accepted')


def make_app():
    return tornado.web.Application([
        (r"/", Handler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8000, '0.0.0.0')
    IOLoop.current().start()

and visit http://127.0.0.1:8000, you can see it's run ok:

2017-01-17 22:42:10.983632 0
2017-01-17 22:42:10.983632 1
2017-01-17 22:42:10.983632 2
2017-01-17 22:42:13.710145 0
2017-01-17 22:42:10.983632 3
2017-01-17 22:42:13.710145 1
2017-01-17 22:42:10.983632 4
2017-01-17 22:42:13.710145 2
2017-01-17 22:42:10.983632 5
2017-01-17 22:42:16.694966 0
2017-01-17 22:42:13.710145 3
2017-01-17 22:42:10.983632 6
2017-01-17 22:42:16.694966 1
2017-01-17 22:42:13.710145 4
2017-01-17 22:42:10.983632 7
2017-01-17 22:42:16.694966 2
2017-01-17 22:42:13.710145 5
2017-01-17 22:42:10.983632 8
2017-01-17 22:42:16.694966 3
2017-01-17 22:42:13.710145 6
2017-01-17 22:42:19.790646 0
2017-01-17 22:42:10.983632 9
2017-01-17 22:42:16.694966 4
2017-01-17 22:42:13.710145 7
2017-01-17 22:42:19.790646 1
2017-01-17 22:42:16.694966 5
2017-01-17 22:42:13.710145 8
2017-01-17 22:42:19.790646 2
2017-01-17 22:42:16.694966 6
2017-01-17 22:42:13.710145 9
2017-01-17 22:42:19.790646 3
2017-01-17 22:42:16.694966 7
2017-01-17 22:42:19.790646 4
2017-01-17 22:42:16.694966 8
2017-01-17 22:42:19.790646 5
2017-01-17 22:42:16.694966 9
2017-01-17 22:42:19.790646 6
2017-01-17 22:42:19.790646 7
2017-01-17 22:42:19.790646 8
2017-01-17 22:42:19.790646 9

Want to help everyone!

链接地址: http://www.djcxy.com/p/27852.html

上一篇: 与Tornado同时使用Python生成器

下一篇: 在Tornado中运行异步后台任务