理解Python事件驅動程式設計(Event Loop)
2年前我學習Python的時候只知道併發程式設計模型只有多程序和多執行緒,這兩個模型的特點都是交由作業系統排程,無法人為控制,而且短板很明顯,上下文切換和建立開銷都是問題。後來又聽說了Python的協程-使用者級執行緒,可以人為排程,雖然輕量,但是本質上都是利用多個worker避免一個worker帶來的阻塞問題。後來接觸到Tornado,知道了Python的非同步程式設計,號稱單執行緒非同步高效能web伺服器。那個時候我一直有個疑問,既然是單執行緒,它是怎麼做到這麼高效能的,是不是內部在TCP層優化了,或者是利用了TCP層的什麼複用技術。再後來我知道Tornado裡面有個event loop的概念,由此我終於知道了事件驅動程式設計,Python所謂非同步程式設計的真正面目。
想要理解Python的事件驅動程式設計,就必須首先理解Python的協程、yield、以及IO多路複用(select、poll、epoll三件套)。但是我們還是要分兩種場景來說明,一種是作為服務端,另一種是作為客戶端。作為服務端,event loop最核心的就是IO多路複用技術(不懂的話可以自行百度),所有來自客戶端的請求都由IO多路複用函式來處理,我覺得這個決定了Python的非同步程式設計並不是真正的非同步,在select返回準備好了的事件,依然是輪詢處理,只要其中一個事件阻塞了,也會阻塞其他事件。
展示下tornado的原始碼,只看中文的註釋,其他細節先不管,太複雜了
while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: # 監聽事件 event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) # 迴圈處理 while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None
作為客戶端,實際上並不會用到IO多路複用,在tornado中是通過註冊回撥函式,ioloop每次都會在開始輪詢callbacks陣列並處理這些回撥函式,其中的run_sync函式其實就是拿到使用者定義的函式,利用內部的run函式註冊回撥函式,到時候直接執行。所以如果使用者定義的函式是個協程,就必須使用gen.coroutine來激發協程,這也是gen.coroutine本質的作用。
上面的程式碼裡已經展示了執行callback的過程,如下:
for callback in callbacks:
self._run_callback(callback)
總結:作為服務端,event loop的核心在於IO多路複用技術;作為客戶端,event loop的核心在於利用Future物件延遲執行,並使用send函式激發協程,並不會使用到IO多路複用技術,但是對於IO還是做了優化的,可以看tornado TCPClient類的實現,socket設定為非阻塞,但是aiohttp對客戶端的IO還是利用了IO多路複用技術,使得效能更好。所以不管是作為客戶端還是服務端,event loop本質上都不是非同步的,所以一定會有阻塞問題存在,在我知道的使用IO多路複用技術的這些框架中,大部分都是再利用執行緒池處理耗時的操作,這樣會極大的提高併發量,神不知鬼不覺得達到了非同步的效果。