1. 程式人生 > >Tornado非同步框架理解

Tornado非同步框架理解

一、介紹
這裡直接引用原文:
Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections.
An asynchronous networking library (IOLoop and IOStream), which serve as the building blocks for the HTTP components and can also be used to implement other protocols.
對於幾種網路IO模型的理解可以參考:
http://blog.csdn.net/historyasamirror/article/details/5778378

http://www.cnblogs.com/Anker/p/3254269.html
http://www.ibm.com/developerworks/cn/linux/l-async/
個人覺得前兩篇闡述得比較準確,第三篇有些詞容易產生異議,比如非同步阻塞I/O說的其實就是IO multiplexing,而非同步非阻塞I/O則是Asynchronous I/O,使用後面的術語不易混淆些。這裡引用幾句話:
同步IO與非同步IO:是針對應用程式與核心的互動而言的。同步過程中程序觸發IO操作並等待或者輪詢的去檢視IO操作是否完成。非同步過程中程序觸發IO操作以後,直接返回,做自己的事情,IO交給核心來處理,完成後核心通知程序IO完成。
阻塞IO與非阻塞IO:簡單理解為需要做一件事能不能立即得到返回應答,如果不能立即獲得返回,需要等待,那就阻塞了,否則就可以理解為非阻塞。
這裡有個比較容易疑惑的地方,這裡的非同步說程序觸發IO操作後,直接返回,那麼是否可以理解為非同步IO一定是非阻塞IO呢?是這樣的,這是四種不同的IO模型,它們有各自的定義和特點,不是包含非包含的關係。另外確定是否為非同步,要看作業系統是否將使用者資料主動的拷貝到使用者(伺服器)空間去供使用者使用,而不是使用者(伺服器)主動監聽。

我的理解:tornado基於epoll模型,屬於IO multiplexing,並非非同步IO模型,也不是非阻塞IO模型,事實上tornado裡面說的asynchronous and non-blocking主要是針對函式和連線(socket)而言的:
An asynchronous function returns before it is finished.
In the context of Tornado we generally talk about blocking in the context of network I/O, although all kinds of blocking are to be minimized.
其實我們平常常說的同步非同步並不一定是在說IO模型。

二、它如何工作

我用的tornado版本為3.1.1,python版本為2.7.3,下面是一個簡單的例子:

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler, asynchronous

class MainHandler(RequestHandler):
    @asynchronous
    def get(self):
        self.finish("Hello, world")

if __name__ == "__main__":
    http_server = HTTPServer(Application([(r"/", MainHandler),]))
    http_server.listen(8888)
    IOLoop.instance().start()
這樣一個高效能的web服務就完成了,核心只有一個IOLoop和一個HTTPServer,我們從上往下看,先看HTTPServer。

HTTPServer繼承TCPServer,它只負責處理將接收到的新連線的socket新增到IOLoop中。

def listen(self, port, address=""):
    sockets = bind_sockets(port, address=address)
    self.add_sockets(sockets)

def add_sockets(self, sockets):
    if self.io_loop is None:
        self.io_loop = IOLoop.current()

    for sock in sockets:
        self._sockets[sock.fileno()] = sock
        add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)
首先將HTTPServer這個監聽型socket新增到IOLoop中,新增完成後在accept_handler接受新連線,接受到新連線後呼叫self._handle_connection。
def add_accept_handler(sock, callback, io_loop=None):
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                if e.args[0] == errno.ECONNABORTED:
                    continue
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
在_handle_connection中建立一個IOStream物件,傳給handle_stream,並且在handle_stream中初始化一個HTTPConnection物件。
def _handle_connection(self, connection, address):
    if self.ssl_options is not None:
        assert ssl, "Python 2.6+ and OpenSSL required for SSL"
        try:
            connection = ssl_wrap_socket(connection,
                                         self.ssl_options,
                                         server_side=True,
                                         do_handshake_on_connect=False)
        except ssl.SSLError as err:
            if err.args[0] == ssl.SSL_ERROR_EOF:
                return connection.close()
            else:
                raise
        except socket.error as err:
            if err.args[0] in (errno.ECONNABORTED, errno.EINVAL):
                return connection.close()
            else:
                raise
    try:
        if self.ssl_options is not None:
            stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)
        else:
            stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size)
        self.handle_stream(stream, address)
    except Exception:
        app_log.error("Error in connection callback", exc_info=True)

def handle_stream(self, stream, address):
    HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders, self.protocol)
到HTTPConnection初始化時,新的連線已經接受,並初始化了IOStream物件,就可以開始讀請求過來的資料了,讀完之後交給_header_callback,實際是交給_on_headers解析資料。
在_on_handlers解析完請求資料後建立HTTPRequest物件,並將該物件作為引數傳給最初的那個request_callback(即在main方法中傳給HttpServer的Application)的__call__方法,到此整個請求流程就很清晰了。
def __init__(self, stream, address, request_callback, no_keep_alive=False,
                 xheaders=False, protocol=None):
    self.stream = stream
    self.address = address
    self.address_family = stream.socket.family
    self.request_callback = request_callback
    self.no_keep_alive = no_keep_alive
    self.xheaders = xheaders
    self.protocol = protocol
    self._clear_request_state()
    self._header_callback = stack_context.wrap(self._on_headers)
    self.stream.set_close_callback(self._on_connection_close)
    self.stream.read_until(b"\r\n\r\n", self._header_callback)
這裡還囉嗦幾句,Application的__call__方法首先會呼叫該請求對應Handler的父類RequestHandler的_execute方法,這裡的幾個邏輯解釋一下。
首先執行self._when_complete(self.prepare(), self._execute_method),會執行self.prepare(),即正式處理請求之前的邏輯,基類中是空實現,開發者可根據需要在自己的Handler中實現,該方法正常返回一個Future物件。
如果未實現self.prepare()則直接呼叫self._execute_method,反之則通過IOLoop迴圈執行完self.prepare()後再呼叫self._execute_method,即呼叫開發者寫的Handler裡面的get或post等請求邏輯。
開發者邏輯執行完成後執行self.finish()。
def _execute(self, transforms, *args, **kwargs):
    """Executes this request with the given output transforms."""
    self._transforms = transforms
    try:
        if self.request.method not in self.SUPPORTED_METHODS:
            raise HTTPError(405)
        self.path_args = [self.decode_argument(arg) for arg in args]
        self.path_kwargs = dict((k, self.decode_argument(v, name=k))
                                for (k, v) in kwargs.items())
        # If XSRF cookies are turned on, reject form submissions without
        # the proper cookie
        if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
                self.application.settings.get("xsrf_cookies"):
            self.check_xsrf_cookie()
        self._when_complete(self.prepare(), self._execute_method)
    except Exception as e:
        self._handle_request_exception(e)

def _when_complete(self, result, callback):
    try:
        if result is None:
            callback()
        elif isinstance(result, Future):
            if result.done():
                if result.result() is not None:
                    raise ValueError('Expected None, got %r' % result)
                callback()
            else:
                # Delayed import of IOLoop because it's not available
                # on app engine
                from tornado.ioloop import IOLoop
                IOLoop.current().add_future(
                    result, functools.partial(self._when_complete,
                                              callback=callback))
        else:
            raise ValueError("Expected Future or None, got %r" % result)
    except Exception as e:
        self._handle_request_exception(e)

def _execute_method(self):
    if not self._finished:
        method = getattr(self, self.request.method.lower())
        self._when_complete(method(*self.path_args, **self.path_kwargs),
                            self._execute_finish)

def _execute_finish(self):
    if self._auto_finish and not self._finished:
        self.finish()
再看IOLoop,這個模組是非同步機制的核心,它包含了一系列已經開啟的檔案描述符和每個描述符的處理器(handlers)。
針對不同的平臺,tornado提供了多種IOLoop實現方式,包括select、epoll、kqueue,其實就是IO多路複用的實現,這些都繼承PollIOLoop,PollIOLoop是對IOLoop的一個基本封裝。
IOLoop的功能是選擇那些已經準備好讀寫的檔案描述符,然後呼叫它們各自的處理器。
可以通過呼叫add_handler()方法將一個socket加入IOLoop中,上面的HTTPServer監聽socket就是通過add_handler新增到IOLoop中去的:
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
來具體看下add_handler這個方法,為fd註冊handler來接收event,事件包括READ、WRITE、ERROR三種,預設為ERROR,當註冊的事件觸發時,將會呼叫handler(fd, events)函式。
self._impl是前面說的select、epoll、kqueue其中一種的例項,register函式只是根據事件型別將fd放到不同的事件集合中去。
def add_handler(self, fd, handler, events):
    self._handlers[fd] = stack_context.wrap(handler)
    self._impl.register(fd, events | self.ERROR)
接下來IOLoop就要開始工作了,看start()方法(程式碼比較長,只保留了主要部分):
def start(self):
    [...]
    self._running = True
    [...]
    while True:
        poll_timeout = 3600.0
        with self._callback_lock:
            callbacks = self._callbacks
            self._callbacks = []
        for callback in callbacks:
            self._run_callback(callback)

        [...通過_timeouts來優化poll_timeout...]

        if self._callbacks:
            poll_timeout = 0.0

        if not self._running:
            break

        [...]

        try:
            event_pairs = self._impl.poll(poll_timeout)#取出資料已準備好的事件,當poll有結果時才會返回,否則一直阻塞,直到poll_timeout
        except Exception as e:
            if (getattr(e, 'errno', None) == errno.EINTR or
                (isinstance(getattr(e, 'args', None), tuple) and
                 len(e.args) == 2 and e.args[0] == errno.EINTR)):
                continue
            else:
                raise

        [...]

        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)#執行handler,即執行netutil中的accept_handler方法,接著會接受socket,呼叫TCPServer中的_handle_connection方法,該方法會建立一個IOStream例項進行非同步讀寫
            except (OSError, IOError) as e:
                if e.args[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    app_log.error("Exception in I/O handler for fd %s",
                                  fd, exc_info=True)
            except Exception:
                app_log.error("Exception in I/O handler for fd %s",
                              fd, exc_info=True)
    [...]

這裡看下SelectIOLoop的_impl(即_Select)的poll:

def poll(self, timeout):
    readable, writeable, errors = select.select(self.read_fds, self.write_fds, self.error_fds, timeout)#有結果才返回,否則一直阻塞,直到poll_timeout
    events = {}
    for fd in readable:
        events[fd] = events.get(fd, 0) | IOLoop.READ
    for fd in writeable:
        events[fd] = events.get(fd, 0) | IOLoop.WRITE
    for fd in errors:
        events[fd] = events.get(fd, 0) | IOLoop.ERROR
    return events.items()

至此,可以清晰得看到tornado是如何工作的了!核心就兩點:使用epoll模型,保證高併發時接受請求的高效性;將可能阻塞的方法都放到IOLoop裡面去迴圈執行,即程式上的非同步,保證CPU的高利用率。這樣高併發時,tornado一直在接受請求並一直在努力順暢的工作,效能自然就上去了。

參考:

http://www.tornadoweb.org/en/stable/
http://blog.csdn.net/historyasamirror/article/details/5778378
http://www.cnblogs.com/Anker/p/3254269.html

相關推薦

Tornado非同步框架理解

一、介紹 這裡直接引用原文: Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-b

python高效能非同步框架 Tornado精簡教程

第一個demo:,這和其它框架幾乎一樣,所以不作解釋 import tornado.webimport tornado.ioloop######################################################class IndexHandler(tornado.web.Req

tornado web非同步框架 和 nodejs 的非同步對比

最近專案中用到tornado 也是新接觸看到裡面的非同步實現 事件驅動實現是由IOLoop PollIOLoop 這兩個類來實現 IOLoop執行在主程序裡面 以前看過nodejs的事件迴圈機制 通過eventloop 保持主執行緒一直處於迴圈狀態 對回撥函式事件處理

Tornado Web 框架

str bre 服務器端 xtend erro 錯誤 需要 django title 一、簡介   Tornado 是 FriendFeed 使用的可擴展的非阻塞式 web 服務器及其相關工具的開源版本。這個 Web 框架看起來有些像web.py 或者 Google 的 w

Hibernate 框架理解

pin persist 編寫 映射 一個 無需 單位 hiberna 包裝   Hibernate框架簡化了java應用程序與數據庫交互的開發。Hibernate是一個開源,輕量級的ORM(對象關系映射)工具。   ORM工具簡化瀏覽數據的創建,數據處理和數據訪問。它是將對

Scrapy 框架理解

mage down 沒有 包括 構圖 domain 信息 控制 dom scrapy的架構圖: 組件 Scrapy Engine 引擎負責控制數據流在系統中所有組件中流動,並在相應動作發生時觸發事件。 調度器(Scheduler) 調度器從引擎接受reque

ssm框架理解

配置 數據 ESS 相同 共同點 清空 pat ioc 對比 SpringSpring就像是整個項目中裝配bean的大工廠,在配置文件中可以指定使用特定的參數去調用實體類的構造方法來實例化對象。Spring的核心思想是IoC(控制反轉),即不再需要程序員去顯式地new一個對

SpringMVC框架理解

ola ans sub character XML toad this 轉發 org JavaEE體系結構包括四層,從上到下分別是應用層、Web層、業務層、持久層。Struts和SpringMVC是Web層的框架,Spring是業務層的框架,Hibernate和MyBati

tornado非同步原理(1)--非同步事件

tornado非同步原理 tornado有四類非同步事件:立即事件,定時器非同步事件,io非同步事件,Future非同步事件。 tornado 的ioloop管理所有的非同步事件,並在適當的時機呼叫非同步事件的回掉函式。 四類非同步事件均在ioloop的start函式中排程。 立即事件: 場景:當前

tornado非同步原理(2)——高併發

如下程式碼所示,當tornado web server 的TCPSever接收到客戶端的socket時,會將該soket新增至ioloop監聽列表, 當socket可讀時,ioloop會呼叫回掉函式tcpserver._handle_connection()函式 —— tornado io非同步事件

驅動框架理解

概述 API在某個標頭檔案中定義,被封裝在某個DLL中,而這個DLL會進一步被封裝在ntdll.dll中(它裡面的API叫native api),比如,ReadFile在ntdll.dll中就對應著ntReadFile;然後這個API會通過sysenter的方式進入核心層。 那麼,比如對於Cre

tornado 非同步化以及協程化

非同步化: import tornado.web import tornado.ioloop import tornado.httpclient class MainHandler(tornado.web.RequestHandler): @tornado.web.asynchoro

真正的 Tornado 非同步非阻塞

原文出處https://hexiangyu.me/posts/15 其中 Tornado 的定義是 Web 框架和非同步網路庫,其中他具備有非同步非阻塞能力,能解決他兩個框架請求阻塞的問題,在需要併發能力時候就應該使用 Tornado。 但是在實際使用過程中很容易把 Tornado

tornado非同步web請求

1.為什麼要使用非同步web服務 使用非同步非阻塞請求,併發處理更高效。 2.同步與非同步請求比較 同步請求時,web伺服器程序是阻塞的,也就是說當一個請求被處理時,伺服器程序會被掛起直至請求完成。 非同步請求時,web伺服器程序在等待請求處理過程中,讓I/O迴圈開啟,以便服務於其他請求,請

Android非同步框架RxJava 1.x系列(二)

前言 在介紹 RxJava 1.x 執行緒排程器之前,首先引入一個重要的概念 - 事件序列轉換。RxJava 提供了對事件序列進行轉換的支援,這是它的核心功能之一。 正文 1. 事件序列轉換定義 所謂轉換,就是將事件序列中的物件或整個序列進行加工處理,轉換成

Android非同步框架RxJava 1.x系列(一)

前言 RxJava 是一款基於 Java VM 實現的響應式程式設計擴充套件庫 - 基於觀察者模式的非同步和事件處理框架。RxJava 官方目前同時維護了兩個版本,分別是 1.x 和 2.x,區別是它們使用不同的 group id 和 namespaces。

使用Tornado非同步接入第三方(支付寶)支付

目前國內比較流行的第三方支付主要有支付寶和微信支付,博主最近研究了下如何用Python接入支付寶支付,這裡我以Tornado作為web框架,接入支付寶構造支付介面。 使用Tornado非同步接入支付寶支付流程: 1. 進入螞蟻金服開放平臺填寫開發者資訊、應用資訊 2. 配置RSA256金鑰,生

python3 web框架之Django(二、關於web框架理解

我們在瀏覽網站不同頁面的時候後面url也會變,不是我們這樣不管誰來訪問都是“ hello web” 那我們想要實現這樣的功能呢?看程式碼: from wsgiref.simple_server import make_server def handle_reque

Python Tornado 非同步處理實現

本文將敘述如何利用執行緒池的方式實現Tornado的非同步處理。 1. 非同步處理方案        在處理請求應用上加上@tornado.web.asynchronous和@tornado.gen.engine裝飾器,即可實現非同步方法配合實現非阻塞請求處理。請求上加了

通俗易懂SpringMVC整體框架理解

最近又重新溫習了一下前臺SpringMVC框架,能夠從整體上對SpringMVC有一個全域性的認識。在這裡也總結一下,為那些即將學習SpringMVC的親們,做一個很好的開端吧!  1. SpringMVC整體框架圖  學習某一種技術的話,要能夠從巨集觀上把握一下這