利用tornado使請求實現異步非阻塞
基本IO模型
網上搜了很多關於同步異步,阻塞非阻塞的說法,理解還是不能很透徹,有必要買書看下。
參考:使用異步 I/O 大大提高應用程序的性能
怎樣理解阻塞非阻塞與同步異步的區別?
-
同步和異步:主要關註消息通信機制(重點在B?)。
同步:A調用B,B處理直到獲得結果,才返回給A。
異步:A調用B,B直接返回。無需等待結果,B通過狀態,通知等來通知A或回調函數來處理。 -
阻塞和非阻塞:主要關註程序等待(重點在A?)。
阻塞:A調用B,A被掛起直到B返回結果給A,A繼續執行。
非阻塞:A調用B,A不會被掛起,A可以執行其他操作(但可能A需要輪詢檢查B是否返回)。 -
同步阻塞:A調用B,A掛起,B處理直到獲得結果,返回給A,A繼續執行。
-
同步非阻塞:A調用B,A繼續執行,B處理直到獲得結果,處理的同時A輪詢檢查B是否返回結果。
-
異步阻塞:異步阻塞 I/O 模型的典型流程 (select)。
-
異步非阻塞:A調用B,B立即返回,A繼續執行,B得到結果後通過狀態,通知等通知A或回調函數處理。
tornado實現異步非阻塞
參考:
- 使用tornado讓你的請求異步非阻塞
- 官方文檔
利用異步方法(回調)和@tornado.web.asynchronous
@tornado.web.asynchronous
並不能將一個同步方法變成異步,所以修飾在同步方法上是無效的,只是告訴框架,這個方法是異步的,且只能適用於HTTP verb方法(get、post、delete、put等)。@tornado.web.asynchronous
@tornado.gen.coroutine
來修飾。對於用@tornado.web.asynchronous
修飾的異步方法,需要主動self.finish()
來結束該請求,普通的方法(get()
等)會自動結束請求在方法返回的時候。最基本的使用callback-style的例子,直接使用異步方法,並定義callback方法。
# sync blocking
class SleepHandler(BaseHandler):
# no effective
@tornado.web.asynchronous
def get(self):
time.sleep(5)
self.write(‘sleep for 5s‘)
class SleepHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 5, callback=self.on_response)
def on_response(self):
self.write(‘sleep for 5s‘)
self.finish()
# call back
class MyRequestHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
http = httpclient.AsyncHTTPClient()
http.fetch(‘http://www.baidu.com‘, self._on_download)
def _on_download(self, response):
self.write(response.body)
self.finish()
利用ThreadPoolExecutor
利用ThreadPoolExecutor的submit和future對象的add_done_callback
方法,將一個同步方法變成異步。如下例所示,若沒有無參可以不用partial()
:
class SleepHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
sleep_time = 5
def callback(future):
self.write(future.result())
self.finish()
EXECUTOR.submit(partial(self.get_sleep, sleep_time)).add_done_callback(
lambda future: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, future)))
def get_sleep(self, sleep_time):
time.sleep(sleep_time)
return "Awake! %s" % time.time()
分解一下:
future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))
返回了一個future
對象。future.add_done_callback()
給future
添加一個完成回調函數callback_func
。- 實際上這個回調函數是
callback_func = lambda future: tornado.ioloop.IOLoop.instance().add_callback(partial(callback, future))```
4. 結合起來就是:
class SleepHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
sleep_time = 5
def final_callback(future_param):
self.write(future_param.result())
self.finish()
def executor_callback(future_param):
tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future_param))
future = EXECUTOR.submit(partial(self.get_sleep, sleep_time))
future.add_done_callback(executor_callback)
def get_sleep(self, sleep_time):
time.sleep(sleep_time)
return "Awake! %s" % time.time()
5. 沒看文檔,源碼前本認為為什麽要多此一舉進行兩次callback,直接
tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))
不就ok了嗎,為何還要再加上一層
future.add_done_callback(executor_callback)
但發現直接這樣是會阻塞IOLoop的。查閱相關文檔後發現,```
tornado.ioloop.IOLoop.instance().add_callback
``` 這個方法會將控制權從其他線程轉到IOLoop線程上,直接用
tornado.ioloop.IOLoop.instance().add_callback(partial(final_callback, future))
時,
final_callback()中獲取
future_param.result()仍然會阻塞,所以需要
future.add_done_callback()```,在該線程完成獲取結果後在回callback給IOLoop。
另一種類似的寫法,將上例的方法寫成了一個裝飾器,見下例,但個人認為利用這種方式異步時沒有必要單獨寫一個裝飾器吧,而且也不通用吧?
def unblock(f):
@tornado.web.asynchronous
@wraps(f)
def wrapper(*args, **kwargs):
self = args[0]
def callback(future):
self.write(future.result())
self.finish()
EXECUTOR.submit(
partial(f, *args, **kwargs)
).add_done_callback(
lambda future: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, future)))
return wrapper
class SleepHandler(BaseHandler):
@unblock
def get(self):
time.sleep(5)
return "Awake! %s" % time.time()
使用tornado.concurrent.run_on_executor
簡化
上述的兩例都相當於開啟了新的線程池?
除此之外還可以利用@run_on_executor
裝飾器將同步阻塞函數變成異步(或者說被tornado的裝飾器理解和識別)。首先閱讀一下@run_on_executor
源碼:
def run_on_executor(*args, **kwargs):
"""Decorator to run a synchronous method asynchronously on an executor.
The decorated method may be called with a ``callback`` keyword
argument and returns a future.
The `.IOLoop` and executor to be used are determined by the ``io_loop``
and ``executor`` attributes of ``self``. To use different attributes,
pass keyword arguments to the decorator::
@run_on_executor(executor=‘_thread_pool‘)
def foo(self):
pass
.. versionchanged:: 4.2
Added keyword arguments to use alternative attributes.
"""
def run_on_executor_decorator(fn):
executor = kwargs.get("executor", "executor")
io_loop = kwargs.get("io_loop", "io_loop")
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = getattr(self, executor).submit(fn, self, *args, **kwargs)
if callback:
getattr(self, io_loop).add_future(
future, lambda future: callback(future.result()))
return future
return wrapper
if args and kwargs:
raise ValueError("cannot combine positional and keyword args")
if len(args) == 1:
return run_on_executor_decorator(args[0])
elif len(args) != 0:
raise ValueError("expected 1 argument, got %d", len(args))
return run_on_executor_decorator
可以很快發現在不存在callback
關鍵字參數時,該裝飾器返回了一個future對象,由此並結合上述兩例子可以很快的利用@run_on_executor
寫出,感覺相當於簡化了上述兩例的代碼並使之變得通用:
class SleepHandler(BaseHandler):
executor = ThreadPoolExecutor(2)
@tornado.web.asynchronous
def get(self):
def callback(future_param):
self.write(‘sleep %ss‘ % future_param.result())
self.finish()
future = self.sleep()
future.add_done_callback(lambda f: tornado.ioloop.IOLoop.instance().add_callback(
partial(callback, f)))
@run_on_executor
def sleep(self):
time.sleep(5)
return 5
當然也可以利用callback參數:
class SleepHandler(BaseHandler):
executor = ThreadPoolExecutor(2)
io_loop = tornado.ioloop.IOLoop.instance()
@tornado.web.asynchronous
def get(self):
def callback(res):
self.write(‘sleep %ss‘ % res)
self.finish()
self.sleep(callback=callback)
@run_on_executor
def sleep(self):
time.sleep(5)
return 5
協程方式:@tornado.gen.coroutine
和yield
除了上述的用利用@tornado.web.asynchronous
和callback的方式,還可以用@tornado.gen.coroutine
和yield
,如下例子:
class GenRequestHandler(BaseHandler):
@tornado.gen.coroutine
def get(self):
http = httpclient.AsyncHTTPClient()
res = yield http.fetch(‘http://www.baidu.com‘)
self.write(res.body)
參考官方文檔
Most asynchronous functions in Tornado return a Future
; yielding this object returns its result
.
利用tornado.gen.Task
修飾一個callback型的異步函數使其能夠與yield
使用。
gen.Task is now a function that returns a Future
看一下例子(對於sleep可以直接用yield tornado.gen.sleep(5)
):
# gen.Task
class SleepHandler(BaseHandler):
@tornado.gen.coroutine
def get(self):
yield tornado.gen.Task(tornado.ioloop.IOLoop.instance().add_timeout, time.time() + 5)
# yield tornado.gen.sleep(5)
self.write(‘sleep for 5s‘)
還可以用結合celery使用(但並不是最好的方案)。
Last
https://github.com/tornadoweb/tornado/wiki/Links tornado的wiki上有很多異步相關支持的庫。
作者:蔣狗
鏈接:https://www.jianshu.com/p/ef01c1386933
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並註明出處。
利用tornado使請求實現異步非阻塞