tornado非同步原理(1)--非同步事件
tornado非同步原理
tornado有四類非同步事件:立即事件,定時器非同步事件,io非同步事件,Future非同步事件。
tornado 的ioloop管理所有的非同步事件,並在適當的時機呼叫非同步事件的回掉函式。
四類非同步事件均在ioloop的start函式中排程。
立即事件:
場景:當前函式執行完後,下次ioloop排程時直接排程某函式
用法:ioloop.add_callback(callback, *args, **kwargs)
原理:立即事件全部存放在ioloop._callbacks中,IOLoop每次迴圈都會呼叫這些立即事件的回撥函式
def start(self):while True: ncallbacks = len(self._callbacks) #self._callbacks用於存放所有的立即事件 due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations-= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) #迴圈呼叫所有的立即事件的回撥函式 fortimeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if self._callbacks: #如果在上面呼叫回撥函式的過程中,又添加了新的立即事件,則將等待IO事件的時間設定為0,以便及時呼叫新的立即事件 poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events)
定時器非同步事件:
場景:使用者希望在某一段時間後執行某函式
用法:ioloop.call_at(when, callback, *args, **kwargs), ioloop.call_later(delay, callback, *args, **kwargs)
原理:定時器事件存放在ioloop._timeouts中,IOLoop每次迴圈開始都會找出所有已經超時的定時器,並呼叫對應的回撥函式
def start(self): while True: ncallbacks = len(self._callbacks) due_timeouts = [] #用於存放超時的事件 if self._timeouts: #self._timeouts用於存放所有定時器事件 now = self.time() while self._timeouts: if self._timeouts[0].callback is None: #如果定時器事件沒有回掉函式,則說明已經取消,直接丟棄 heapq.heappop(self._timeouts) #heapq是一個數據結構,它保證heapq[0]永遠是最小的一個元素 self._cancellations -= 1 elif self._timeouts[0].deadline <= now: #如果定時器已經超時,則取出並新增至due_timeouts中 due_timeouts.append(heapq.heappop(self._timeouts)) else: #因為heapq的特性,如果執行到這一步,說明剩下事件都沒有超時,退出迴圈 break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) #迴圈呼叫所有已超時定時器事件的回撥函式 if self._callbacks: poll_timeout = 0.0 elif self._timeouts: #根據最小定時器事件的時間設定等待IO事件的時間 poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events)
IO非同步事件:
場景:等待某個檔案描述符的某個事件,如TCPserver等待socket的READ事件
用法:ioloop.add_handler(fd, callback, events)
原理:所有的檔案描述符全部存放在ioloop._impl中,windows平臺下_impl是tornado.platform.select.SelectIOLoop物件
在linux平臺下_impl是tornado.platform.epoll.EPollIOLoop物件,作用都是同時監聽多個檔案描述符
def start(self): while True: ncallbacks = len(self._callbacks) due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if self._callbacks: poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) #監聽所有檔案描述符 self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) #迴圈呼叫所有檔案描述符對應的回撥函式
Future非同步事件:
場景:等待某個非同步事件結束後執行回掉函式
用法:ioloop.add_future(future, callback), future.add_done_callback(callback)
原理:非同步事件結束後呼叫Future.set_result(),當執行set_result時將future所有的回掉函式新增為ioloop的立即事件
class Future(object): def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True if self._callbacks: from tornado.ioloop import IOLoop loop = IOLoop.current() for cb in self._callbacks: loop.add_callback(cb, self) #將所有的回掉函式設定為ioloop的立即事件 self._callbacks = None