1. 程式人生 > >Tornado 的 IOStream 簡介與應用

Tornado 的 IOStream 簡介與應用

Tornado的核心原始碼是由ioloop.py和iostream.py這2個檔案組成的。前者提供了一個迴圈,用於處理I/O事件;後者則封裝了一個非阻塞的socket。
有了這2者後,就能搭建起TCP server和HTTP server,實現非同步HTTP客戶端,這便是Tornado的主要內容了。
之前在研究socket時已差不多弄懂了ioloop的邏輯,於是本文就接著研究iostream了。

這裡我並不想逐行分析iostream的原始碼,因為並不是什麼很難懂的程式碼,於是只說說它做了些什麼事。
先看IOStream的__init__()方法,重要的引數只有socket和io_loop這2個。所以很容易猜想到,它只是封裝了這個socket,然後把I/O事件註冊到io_loop上。
讀了原始碼就能很快證實我們的猜測,值得注意的是,封裝socket時還將其設為非阻塞的了。
此後在連線和讀寫時都呼叫了_add_io_state()方法,這個方法就是呼叫io_loop的add_handler()和update_handler()方法來註冊事件的。
此外,似乎沒有開放讀完所有資料的介面,要自行實現的話可以使用_read_to_buffer()方法。

看完原始碼可能還是一頭霧水,它究竟能用來幹啥呢?
最為重要的用處當然是實現TCPServer了。有了ioloop,一個伺服器就可以處理I/O事件了;可是I/O並不能憑空產生啊,它還需要通過socket連線和傳輸。
而正如前面所說,IOStream封裝了socket,把它變成了非阻塞的,每次連線和讀寫這些I/O事件都註冊到io_loop上,這樣就實現了一個完整的TCPServer了。
可惜Tornado原始碼裡的TCPServer並沒有實際功能,HTTPServer又太複雜了,所以我找了一段

echo server的程式碼,一看就知道用了。測試也很簡單,用telnet連上去後,輸入什麼就會回顯什麼。

另一個功能還是和非阻塞有關。
Tornado能夠以單執行緒處理高併發,靠的就是非阻塞。而假如其中的任何一次I/O是非阻塞的,需要消耗數毫秒甚至數秒,那麼每秒能處理的請求數就不可能超過1000了。
所以這些耗時很長I/O訪問必須封裝成非阻塞的,而這早就被IOStream做好了。

此外,為了提高服務的響應速度,也需要用到非同步處理。
以前幾天我寫的聊天室為例,當用戶傳送了一條資訊後,我需要把它廣播給所有的使用者,然後才能結束這次響應。
在這個例子中,使用者可能不需要等待多久。可是假如我還需要做一些複雜的處理,比如關鍵字過濾,分析@使用者名稱、URL,儲存到資料庫,或者傳送email等,我並不想讓使用者的請求一直被阻塞著。
那麼我可以搭建另一個TCPServer,將接收到的資訊通過IOStream傳送給它,然後立即結束響應。在那個TCPServer上,我想做任何耗時的事,都不會拖慢主HTTPServer的響應速度;而一旦完成處理,就可以將結果通過IOStream返回給HTTPServer,讓其完成掃尾工作。

這裡仍然以聊天室為例,當接收到資訊後,我將其傳送給EchoServer,並結束響應。
EchoServer返回輸入的資訊,此時HTTPServer監聽到這個事件,就讀取並廣播資訊。
雖然對使用者來說,效果是一樣的。不過整個系統的擴充套件性就增強了,你可以把這個EchoServer替換成訊息佇列、memcache等各種資料來源。雖然實現上仍有很大差異,但最重要的思路是一致的。

程式碼和上次的聊天室差不多,handler部分都一樣,我就只貼出更改的部分吧:

import logging
import os.path
import socket
import uuid
import tornado.httpserver
import tornado.ioloop
import tornado.iostream
import tornado.options
import tornado.web
import tornado.websocket


def broadcast_message(message):
for handler in ChatSocketHandler.socket_handlers:
  try:
   handler.write_message(message)
  except:
   logging.error('Error sending message', exc_info=True)

for callback in ChatHandler.callbacks:
  try:
   callback(message)
  except:
   logging.error('Error in callback', exc_info=True)
ChatHandler.callbacks = set()

def send_message(message):
stream.write((message + '\n').encode('utf-8'))

def read_message_from_echo_server():
def broadcast(message):
  broadcast_message(message[:-1])
  read_message_from_echo_server()
stream.read_until('\n', broadcast)

# ...

if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(('127.0.0.1', 8888))
read_message_from_echo_server()

main()

要注意的是,連線EchoServer需要在呼叫ioloop的start()方法之前,因為這個方法是個死迴圈,後面的程式碼沒機會執行。