1. 程式人生 > 實用技巧 >Tornado 應用筆記04 淺析原始碼

Tornado 應用筆記04 淺析原始碼

目錄

本節內容以日常開發中常見的非同步場景為基礎, 給出Tornado定義的協程和非同步示例, 其中的程式碼稍加修改就可以用到實際專案中. 另外, 本節內容不會對其中原理做進一步說明, 原理分析將放到下一節.

非阻塞 sleep

# 下面三種方法實現的功能都是, 非同步sleep 2秒, 然後輸出 "i sleep 2s"
 
# 推薦的寫法, `.gen.sleep`是`tornado.gen`對`IOLoop`操作的封裝
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.sleep(2)
        self.finish("i sleep 2s")
 
 
# 本質上和第一個方法幾乎沒差別, 相當於上面的原始版
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        yield tornado.gen.Task(tornado.ioloop.IOLoop.current().add_timeout, time.time() + 2)
        self.finish("i sleep 2s")
 
 
# 採用非同步回撥
class NonBlockSleep(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, callback=self.awake)
 
    def awake(self):
        self.finish("i sleep 2s")

用執行緒池處理阻塞操作

這裡需要用到一個新的包futures, 通過pip install futures安裝即可.

單任務, 無回撥, 需要用到阻塞操作結果

兩種方式實現非阻塞計算, 完成計算後輸出結果(不需要操作結果時, 把yield@coroutine去掉即可)

# 使用 submit, 較原始的方式, 未經過Tornado封裝 
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        # 下面兩種實際上是一樣的
        # return concurrent.futures.ThreadPoolExecutor(2)
        return tornado.concurrent.futures.ThreadPoolExecutor(2)
 
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self.executor.submit(self._calculate, *(1,))
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))
 
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num
 
 
# 使用 run_on_executor , 更推薦這種做法
class CoroutineWithThreadPool(tornado.web.RequestHandler):
    executor = concurrent.futures.ThreadPoolExecutor(2)
 
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
        result = yield self._calculate(1)
        used_time = time.time() - s
        self.finish('calculate completed, used %.3f s, result is %s' % (used_time, result))
 
    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num

單任務, 帶回調, 需要用到阻塞操作結果, 原始實現

class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        future = self.executor.submit(self._calculate, *(1,))
        tornado.ioloop.IOLoop.current().add_future(future, self.result_callback)
 
    # 阻塞操作的回撥
    def block_callback(self):
        print 'after block func callback'
 
    # 獲取阻塞操作的結果
    def result_callback(self, future):
        tornado.ioloop.IOLoop.current().add_callback(self.block_callback)
        self.finish('the calculate result is |%s|' % future.result())
 
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num
 

多工, 帶回調, 需要用到阻塞操作結果

class CoroutineWithThreadPool(tornado.web.RequestHandler):
    @property
    def executor(self):
        return concurrent.futures.ThreadPoolExecutor(2)
 
    @property
    def io_loop(self):
        '''
        使用run_on_executor併為future新增callback的時候, 需要設定`self.io_loop`屬性
        實際上`run_on_executor`也提供了給`io_loop`和`executor`改名的功能, 使用方法:
            @property
            def my_io_loop(self):
                return tornado.ioloop.IOLoop.current()
 
            @property
            def my_executor(self):
                return self.application.executor
 
            @tornado.concurrent.run_on_executor(io_loop='my_io_loop', executor='my_executor')
            def block_func(*args, **kwargs):
                pass
 
        callback直接在呼叫需要執行的函式時, 當做普通引數傳入即可,
        `run_on_executor`這個裝飾器使用後會`pop`掉, 無須擔心報錯
        '''
        return tornado.ioloop.IOLoop.current()
 
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        s = time.time()
 
        calculate_result, sleep_result = yield [
            self._calculate(2, callback=self.executor_callback),
            self._sleep(3),
        ]
        '''
        使用字典實現
        multi_task_result = yield {
            'calculate': self._calculate(1),
            'sleep': self._sleep(3),
        }
 
        calculate_result, sleep_result = multi_task_result['calculate'], multi_task_result['sleep']
        '''
        print sleep_result
        used_time = time.time() - s
        self.finish('calculate and sleep completed used %.3f s, %s, the calculate result is %s' %
                    (used_time, sleep_result, calculate_result))
 
    def executor_callback(self, future_result):
        print 'future is done, and the result is |%s|.' % future_result
 
    @tornado.concurrent.run_on_executor
    def _calculate(self, num=0):
        for i in xrange(100000000):
            num += 1
        return num
 
    @tornado.concurrent.run_on_executor
    def _sleep(self, seconds=0):
        time.sleep(seconds)
        return 'sleep used %s seconds' % seconds
 

非同步HTTP請求

# 非同步回撥
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        http_client.fetch("http://www.baidu.com", callback=self.on_response)
 
    def on_response(self, response):
        r = response
        # body, 狀態碼, 請求耗時, headers
        print r.body, r.code, r.request_time
        print {k: v for k, v in r.headers.items()}
        self.finish('fetch completed')
 
 
# 協程
class AsyncFetch(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        self.on_response(response)
        self.finish('fetch completed')
 
    def on_response(self, response):
        print response
 
 
# 原始實現
class AsyncFetch(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        self._auto_finish = False
 
        tornado.httpclient.AsyncHTTPClient.configure(
            None,
            defaults=dict(
                user_agent="MyUserAgent"
            ),
            max_clients=20,
        )
        client = tornado.httpclient.AsyncHTTPClient()
 
        fetch_future = client.fetch('http://www.baidu.com', request_timeout=2)
        # 下面兩種方法均可以實現future done回撥, 不過tornado更推薦`add_future`的做法
        tornado.ioloop.IOLoop.current().add_future(fetch_future, callback=self.on_response)
        # fetch_future.add_done_callback(self.on_response)
 
    def on_response(self, future):
        http_response = future.result()
        print http_response
        result = dict(http_response.headers)
        result.update({'content': http_response.body})
        # raise ValueError  # 異常情況下,
        self.finish(result)
 

IOLoop事件(定時, 回撥)

class IOLoopCallback(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        print time.time()
 
        io_loop = tornado.ioloop.IOLoop.current()
 
        # 定時任務, 將任務丟給IOLoop, 3秒後執行
        io_loop.add_timeout(io_loop.time() + 3, callback=functools.partial(self.callback_timeout))
 
        # 回撥任務, 將任務丟給IOLoop, 由下一個Loop呼叫
        io_loop.add_callback(self.callback_next_loop, None)
 
        # sleep 會阻塞 IOLoop, 所以上面的 `IOLoop.add_timeout` 是相對的, 
        # 如果一直阻塞, 就不可能及時響應
        # time.sleep(4) # 阻塞實驗
 
    def callback_timeout(self):
        print 'callback_timeout at the time %s' % time.time()
 
    def callback_next_loop(self, useless=None):
        print 'callback_next_loop at the time %s' % time.time()
 

長連線輸出(RequestHandler.flush)

class Flush(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write('<h1>sleeping...</h1>')
        self.flush()
        yield tornado.gen.sleep(2)
        self.finish('<h1>awake</h1>')

後臺定時任務

方式1:

@tornado.gen.coroutine
def do_something(func_name):
    print 'from %s n do_something at %s' % (func_name, int(time.time()))
 
 
@tornado.gen.coroutine
def minute_loop1():
    """實際上迴圈週期是(60 + n)秒, n為`do_something`執行時間, 非嚴格60s"""
    while True:
        yield do_something(minute_loop1.__name__)
        yield tornado.gen.sleep(1)  # 開始計時, 並等待計時完成
 
 
@tornado.gen.coroutine
def minute_loop2():
    """比較嚴格的60s週期迴圈"""
    while True:
        sleep = tornado.gen.sleep(2)  # 開始計時
        yield do_something(minute_loop2.__name__)  # 執行間隔協程任務
        yield sleep  # "等待"計時結束
 
 
# 啟動方法
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop1)
tornado.ioloop.IOLoop.current().spawn_callback(minute_loop2)

方式2:

# tornado.ioloop.PeriodicCallback(callback, callback_time, io_loop=None)
 
# 需要注意`callback_time`的單位是`微秒`, 一般`PeriodicCallback`是不執行`協程`任務的,
# 另外如果執行的`callback`耗時比`callback_time`還要長, 那麼
# 應該到點執行的下一次`callback`會被跳過,並放回到執行列表中, 在下一次到點的時候執行
 
COUNT = 0
 
def periodic_callback_print():
    global COUNT
    if COUNT < 3:
        COUNT += 1
        time.sleep(2)
        print 'i have been call back %s times and now is %s' % (COUNT, int(time.time()))
 
ms_loop_time = 1000
 
# 啟動方法, 需要先建立任務, 然後才能啟動
# 建立任務
periodic_schedules_one = tornado.ioloop.PeriodicCallback(periodic_callback_print, ms_loop_time)
# 啟動
periodic_schedules_one.start()
# 確認狀態
assert periodic_schedules_one.is_running()
# 停止
periodic_schedules_one.stop()
 

迴圈/迭代

Python 3.5之前, 在協程中實現迭代會比較麻煩, 你需要將迴圈的條件與yield結果分離. 例如下面這個使用Motor(非同步MongoDB驅動)的例子. 不過在Python 3.5+裡面, 新增的async for可以實現非同步迭代.

import motor
db = motor.MotorClient().test
 
# Python 3.5- 實現
@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()
        ...
 
# Python 3.5+ 實現
async def loop_example(collection):
    cursor = db.collection.find()
    async for doc in cursor:
        ...

本節內容就是這些, 下節內容將分析Tornado協程和非同步實現的部分原始碼.

標籤:Tornado Python