深入理解tornado的ioloop
本文所剖析的tornado原始碼版本為4.4.2
ioloop是tornado的關鍵,是他的最底層。
ioloop就是對I/O多路複用的封裝,它實現了一個單例,將這個單例儲存在IOLoop._instance中
ioloop實現了Reactor模型,將所有要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主執行緒/程序阻塞在多路複用器上;一旦有I/O事件到來或是準備就緒(檔案描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。
另外,ioloop還被用來集中執行回撥函式以及集中處理定時任務。
一 準備知識:
1 首先我們要了解Reactor模型:主執行緒呼叫epoll_wait()負責監聽,讀寫資料,接受新的連線,處理客戶端的請求均在工作執行緒完 成! 相對應PoeactorI的I/O操作都交給主執行緒和核心完後,工作執行緒僅僅處理業務邏輯!
2 其次,我們要了解I/O多路複用,由於本文假設系統為Linux,所以要了解epoll以及Python中的select模組
3 IOLoop類是Configurable類的子類,而Configurable類是一個工廠類,講解在這。
二 建立IOLoop例項
來看IOLoop,它的父類是Configurable類,也就是說:IOLoop是一個直屬配置子類
class IOLoop(Configurable): ......
這裡就要結合Configurable類進行講解:
Configurable中的__new__方法
1 首先例項化一個該直屬配置子類的'執行類物件',也就是呼叫該類的configurable_default方法並返回賦值給impl:
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): # 因為我們假設我們的系統為Linux,且支援epoll,所以這裡為True from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
2 也就是impl是EPollIOLoop類物件,然後例項化該物件,執行其initialize方法
class EPollIOLoop(PollIOLoop): # 該類只有這麼短短的幾句,可見主要的方法是在其父類PollIOLoop中實現。 def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 執行了父類PollIOLoop的initialize方法,並將select.epoll()傳入
來看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())幹了些啥:
class PollIOLoop(IOLoop): # 從屬配置子類 def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) # 呼叫IOLoop的initialize方法 self._impl = impl # self._impl = select.epoll() if hasattr(self._impl, 'fileno'): # 檔案描述符的close_on_exec屬性 set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} # 檔案描述符對應的fileno()作為key,(檔案描述符物件,處理函式)作為value self._events = {} # 用來儲存epoll_obj.poll()返回的事件,也就是哪個fd發生了什麼事件{(fd1, event1), (fd2, event2)……} self._callbacks = [] self._callback_lock = threading.Lock() # 新增執行緒鎖 self._timeouts = [] # 儲存定時任務 self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None # 獲得當前執行緒識別符號 self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
首先呼叫了IOLoop.initialize(self,**kwargs)方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current() @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self
我們可以看到IOLoop.initialize()主要是對執行緒做了一些支援和操作。
3 返回該例項
三 剖析PollIOLoop
1 處理I/O事件以及其對應handler的相關屬性以及方法
使用self._handlers用來儲存fd與handler的對應關係,檔案描述符對應的fileno()作為key,元組(檔案描述符物件,處理函式)作為value
self._events 用來儲存epoll_obj.poll()返回的事件,也就是哪個fd發生了什麼事件{(fd1, event1), (fd2, event2)……}
add_handler方法用來新增handler
update_handle方法用來更新handler
remove_handler方法用來移除handler
def add_handler(self, fd, handler, events): # 向epoll中註冊事件 , 並在self._handlers[fd]中為該檔案描述符新增相應處理函式 fd, obj = self.split_fd(fd) # fd.fileno(),fd self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
2 處理回撥函式的相關屬性以及方法
self._callbacks用來儲存回撥函式
add_callback方法用來直接添加回調函式
add_future方法用來間接的添加回調函式,future物件詳解在這
def add_callback(self, callback, *args, **kwargs): # 因為Python的GIL的限制,導致Python執行緒並不算高效。加上tornado實現了多程序 + 協程的模式,所以我們略過原始碼中的部分執行緒相關的一些操作 if self._closing: return self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs)) def add_future(self, future, callback): # 為future物件新增經過包裝後的回撥函式,該回調函式會在future物件被set_done後新增至_callbacks中 assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
3 處理定時任務的相關屬性以及方法
self._timeouts用來儲存定時任務
self.add_timeout用來新增定時任務(self.call_later self.call_at都是間接呼叫了該方法)
def add_timeout(self, deadline, callback, *args, **kwargs): """ ``deadline``可能是一個數字,表示相對於當前時間的時間(與“IOLoop.time”通常為“time.time”相同的大小),或者是datetime.timedelta物件。 自從Tornado 4.0以來,`call_later`是一個比較方便的替代方案,因為它不需要timedelta物件。 """ if isinstance(deadline, numbers.Real): return self.call_at(deadline, callback, *args, **kwargs) elif isinstance(deadline, datetime.timedelta): return self.call_at(self.time() + timedelta_to_seconds(deadline), callback, *args, **kwargs) else: raise TypeError("Unsupported deadline %r" % deadline)
4 啟動io多路複用器
啟動也一般就意味著開始迴圈,那麼迴圈什麼呢?
1 執行回撥函式
2 執行時間已到的定時任務
3 當某個檔案描述法發生事件時,執行該事件對應的handler
使用start方法啟動ioloop,看一下其簡化版(去除執行緒相關,以及一些相對不重要的細節):
def start(self): try: while True: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] # 將時間已到的定時任務放置到due_timeouts中,過程省略 for callback in callbacks: # 執行callback self._run_callback(callback) for timeout in due_timeouts: # 執行定時任務 if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None # 釋放記憶體 # 根據情況設定poll_timeout的值,過程省略 if not self._running: # 終止ioloop執行時,在執行完了callback後結束迴圈 breaktry: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系統呼叫被訊號處理函式中斷,進行下一次迴圈 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 獲取一個fd以及對應事件 try: fd_obj, handler_func = self._handlers[fd] # 獲取該fd對應的事件處理函式 handler_func(fd_obj, events) # 執行該事件處理函式 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 當客戶端關閉連線時會產生EPIPE錯誤 pass # 其他異常處理已經省略 fd_obj = handler_func = None # 釋放記憶體空間
start完整版
5 關閉io多路複用器
def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: # 該引數若為True,則表示會關閉所有檔案描述符 for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None