1. 程式人生 > >深入理解tornado的ioloop

深入理解tornado的ioloop

本文所剖析的tornado原始碼版本為4.4.2

ioloop是tornado的關鍵,是他的最底層。

ioloop就是對I/O多路複用的封裝,它實現了一個單例,將這個單例儲存在IOLoop._instance中

ioloop實現了Reactor模型,將所有要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主執行緒/程序阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(檔案描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。

另外,ioloop還被用來集中執行回撥函式以及集中處理定時任務。

一 準備知識:

  1 首先我們要了解Reactor模型:主執行緒呼叫epoll_wait()負責監聽,讀寫資料,接受新的連線,處理客戶端的請求均在工作執行緒完              成!   相對應PoeactorI的I/O操作都交給主執行緒和核心完後,工作執行緒僅僅處理業務邏輯!

  2 其次,我們要了解I/O多路複用,由於本文假設系統為Linux,所以要了解epoll以及Python中的select模組

  3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這

二  建立IOLoop例項

來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類

class IOLoop(Configurable):
    ......

這裡就要結合Configurable類進行講解:

 Configurable中的__new__方法

1 首先例項化一個該直屬配置子類的'執行類物件',也就是呼叫該類的configurable_default方法並返回賦值給impl:

    @classmethod
    def configurable_default(cls):
        if hasattr(select, "epoll"):     # 因為我們假設我們的系統為Linux,且支援epoll,所以這裡為True
            from tornado.platform.epoll import EPollIOLoop
            return EPollIOLoop 
        if hasattr(select, "kqueue"):
            # Python 2.6+ on BSD or Mac
            from tornado.platform.kqueue import KQueueIOLoop
            return KQueueIOLoop
        from tornado.platform.select import SelectIOLoop
        return SelectIOLoop

2 也就是impl是EPollIOLoop類物件,然後例項化該物件,執行其initialize方法

class EPollIOLoop(PollIOLoop):  # 該類只有這麼短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。
    def initialize(self, **kwargs):
        super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入

  來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())幹了些啥:

class PollIOLoop(IOLoop):  # 從屬配置子類

    def initialize(self, impl, time_func=None, **kwargs):
        super(PollIOLoop, self).initialize(**kwargs)                # 呼叫IOLoop的initialize方法
        self._impl = impl                               # self._impl = select.epoll()
        if hasattr(self._impl, 'fileno'):               # 檔案描述符的close_on_exec屬性
            set_close_exec(self._impl.fileno())
        self.time_func = time_func or time.time
        self._handlers = {}                             # 檔案描述符對應的fileno()作為key,(檔案描述符物件,處理函式)作為value
        self._events = {}                               # 用來儲存epoll_obj.poll()返回的事件,也就是哪個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}
        self._callbacks = []
        self._callback_lock = threading.Lock()          # 新增執行緒鎖
        self._timeouts = []                             # 儲存定時任務
        self._cancellations = 0
        self._running = False
        self._stopped = False
        self._closing = False
        self._thread_ident = None                       # 獲得當前執行緒識別符號
        self._blocking_signal_threshold = None
        self._timeout_counter = itertools.count()

        # Create a pipe that we send bogus data to when we want to wake
        # the I/O loop when it is idle
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

  首先呼叫了IOLoop.initialize(self,**kwargs)方法:

    def initialize(self, make_current=None):
        if make_current is None:
            if IOLoop.current(instance=False) is None:
                self.make_current()
        elif make_current:
            if IOLoop.current(instance=False) is not None:
                raise RuntimeError("current IOLoop already exists")
            self.make_current()

    @staticmethod
    def current(instance=True):
        current = getattr(IOLoop._current, "instance", None)
        if current is None and instance:
            return IOLoop.instance()
        return current

    def make_current(self):
        IOLoop._current.instance = self

    我們可以看到IOLoop.initialize()主要是對執行緒做了一些支援和操作。

3 返回該例項

三 剖析PollIOLoop

1 處理I/O事件以及其對應handler的相關屬性以及方法

    使用self._handlers用來儲存fd與handler的對應關係,檔案描述符對應的fileno()作為key,元組(檔案描述符物件,處理函式)作為value

  self._events 用來儲存epoll_obj.poll()返回的事件,也就是哪個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}

    add_handler方法用來新增handler

  update_handle方法用來更新handler

    remove_handler方法用來移除handler

    def add_handler(self, fd, handler, events):
        # 向epoll中註冊事件 , 並在self._handlers[fd]中為該檔案描述符新增相應處理函式
        fd, obj = self.split_fd(fd)   # fd.fileno(),fd
        self._handlers[fd] = (obj, stack_context.wrap(handler))
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        fd, obj = self.split_fd(fd)
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        fd, obj = self.split_fd(fd)
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

2 處理回撥函式的相關屬性以及方法

  self._callbacks用來儲存回撥函式

  add_callback方法用來直接添加回調函式

  add_future方法用來間接的添加回調函式,future物件詳解在這

    def add_callback(self, callback, *args, **kwargs):
        # 因為Python的GIL的限制,導致Python執行緒並不算高效。加上tornado實現了多程序 + 協程的模式,所以我們略過原始碼中的部分執行緒相關的一些操作
        if self._closing:
            return
        self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs))
    def add_future(self, future, callback):
        # 為future物件新增經過包裝後的回撥函式,該回調函式會在future物件被set_done後新增至_callbacks中
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))

3 處理定時任務的相關屬性以及方法

  self._timeouts用來儲存定時任務

  self.add_timeout用來新增定時任務(self.call_later   self.call_at都是間接呼叫了該方法)

def add_timeout(self, deadline, callback, *args, **kwargs):
        """
            ``deadline``可能是一個數字,表示相對於當前時間的時間(與“IOLoop.time”通常為“time.time”相同的大小),或者是datetime.timedelta物件。 
            自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,因為它不需要timedelta物件。

        """
        if isinstance(deadline, numbers.Real):
            return self.call_at(deadline, callback, *args, **kwargs)
        elif isinstance(deadline, datetime.timedelta):
            return self.call_at(self.time() + timedelta_to_seconds(deadline),
                                callback, *args, **kwargs)
        else:
            raise TypeError("Unsupported deadline %r" % deadline)

4 啟動io多路複用器

  啟動也一般就意味著開始迴圈,那麼迴圈什麼呢?

    1 執行回撥函式

    2 執行時間已到的定時任務

    3 當某個檔案描述法發生事件時,執行該事件對應的handler

  使用start方法啟動ioloop,看一下其簡化版(去除執行緒相關,以及一些相對不重要的細節):

def start(self):
        try:
            while True:    
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 將時間已到的定時任務放置到due_timeouts中,過程省略
                for callback in callbacks:          # 執行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 執行定時任務
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)       
                callbacks = callback = due_timeouts = timeout = None    # 釋放記憶體
                # 根據情況設定poll_timeout的值,過程省略
                if not self._running:    # 終止ioloop執行時,在執行完了callback後結束迴圈
                    breaktry:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:  # 系統呼叫被訊號處理函式中斷,進行下一次迴圈
                        continue
                    else:
                        raise 
                self._events.update(event_pairs)
                while self._events: 
                    fd, events = self._events.popitem()             # 獲取一個fd以及對應事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 獲取該fd對應的事件處理函式
                        handler_func(fd_obj, events)                # 執行該事件處理函式
                    except (OSError, IOError) as e:         
                        if errno_from_exception(e) == errno.EPIPE:     # 當客戶端關閉連線時會產生EPIPE錯誤                         
                            pass
                        # 其他異常處理已經省略
                fd_obj = handler_func = None       # 釋放記憶體空間          

 start完整版

5 關閉io多路複用器

def close(self, all_fds=False):
        with self._callback_lock:
            self._closing = True
        self.remove_handler(self._waker.fileno())
        if all_fds:    # 該引數若為True,則表示會關閉所有檔案描述符
            for fd, handler in self._handlers.values():
                self.close_fd(fd)
        self._waker.close()
        self._impl.close() 
        self._callbacks = None
        self._timeouts = None