tornado中epoll的非同步請求的實現過程
最進看tornado的原始碼,也看了不少tornado的原始碼解析的文章,但是基本都沒有涉及到怎麼通過epoll實現非同步請求的.
iopoll.py
def start(self): if self._running: raise RuntimeError("IOLoop is already running") if self._stopped: self._stopped = False return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._running = True try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] for callback in callbacks: self._run_callback(callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break try: event_pairs = self._impl.poll(poll_timeout) #開始執行epoll except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: #poll_timeout超時之後就會丟擲這個異常 continue else: raise # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that modify self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None
這是精簡後的ioloop的start程式碼,去掉了timeout部分的處理. 從程式碼中可以看到在每次epoll迴圈中都會去調取self._run_callback(callback) ,執行callbak函式. 下面會解析tornado中是如何知道請求執行結束然後新增callback的.
httpclient.py
def fetch(self, request, callback=None, raise_error=True, **kwargs): request.headers = httputil.HTTPHeaders(request.headers) request = _RequestProxy(request, self.defaults) future = TracebackFuture() if callback is not None: callback = stack_context.wrap(callback) def handle_future(future): exc = future.exception() if isinstance(exc, HTTPError) and exc.response is not None: response = exc.response elif exc is not None: response = HTTPResponse( request, 599, error=exc, request_time=time.time() - request.start_time) else: response = future.result() self.io_loop.add_callback(callback, response) future.add_done_callback(handle_future) def handle_response(response): if raise_error and response.error: future.set_exception(response.error) else: future.set_result(response) self.fetch_impl(request, handle_response) return future
fetch()就是我們在tornado中常用的非同步請求的函式. 首先是 future.add_done_callback(handle_future), handler_future是將callback和responese作為self.io_loop.add_callback的引數,也就是把callback和response放到io_loop中; 將handle_future作為future的done之後的callback函式,也就是future.add_done_callback(handle_future). 然後self.fetch_impl(request, handle_response), handle_response就是給futrue賦result,之後就會呼叫future的done_callback了. fetch_impl函式在simple_httpclient.py中.
simple_httpclient.py
def fetch_impl(self, request, callback):
key = object()
self.queue.append((key, request, callback))
if not len(self.active) < self.max_clients:
timeout_handle = self.io_loop.add_timeout(
self.io_loop.time() + min(request.connect_timeout,
request.request_timeout),
functools.partial(self._on_timeout, key, "in request queue"))
else:
timeout_handle = None
self.waiting[key] = (request, callback, timeout_handle)
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (
len(self.active), len(self.queue)))
def _process_queue(self):
with stack_context.NullContext():
while self.queue and len(self.active) < self.max_clients:
key, request, callback = self.queue.popleft()
if key not in self.waiting:
continue
self._remove_timeout(key)
self.active[key] = (request, callback)
release_callback = functools.partial(self._release_fetch, key)
self._handle_request(request, release_callback, callback)
def _connection_class(self):
return _HTTPConnection
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
通過程式碼可以找到函式的呼叫關係.
class _HTTPConnection(httputil.HTTPMessageDelegate):
_SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
def __init__(self, io_loop, client, request, release_callback,
final_callback, max_buffer_size, tcp_client,
max_header_size, max_body_size):
#tcpclient中這個函式被@coroutine,函式本身沒有callback,但是coroutine會利用callback
self.tcp_client.connect(host, port, af=af,
ssl_options=ssl_options,
max_buffer_size=self.max_buffer_size,
callback=self._on_connect)
tcpclient.py
@gen.coroutine
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
max_buffer_size=None):
"""Connect to the given host and port.
Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
``ssl_options`` is not None).
"""
addrinfo = yield self.resolver.resolve(host, port, af)
connector = _Connector(
addrinfo, self.io_loop,
functools.partial(self._create_stream, max_buffer_size))
af, addr, stream = yield connector.start()
# TODO: For better performance we could cache the (af, addr)
# information here and re-use it on subsequent connections to
# the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
if ssl_options is not None:
stream = yield stream.start_tls(False, ssl_options=ssl_options,
server_hostname=host)
raise gen.Return(stream)
上面是精簡之後的_HTTPConnection的程式碼,通過呼叫tcpclient的connect實現了callback的回撥. 但是我們發現connect函式沒有callback的引數,其實個函式被@coroutine,函式本身沒有callback,但是coroutine會利用callback,以後的文章會解析gen和coroutine,這裡就不做進一步的解釋了. self._on_connect這個callback函式被呼叫,
simple_httpclient.py
def _on_connect(self, stream):
if self.final_callback is None:
# final_callback is cleared if we've hit our timeout.
stream.close()
return
........
if self.request.expect_100_continue:
self._read_response()
else:
self._write_body(True)
def _write_body(self, start_read):
.......
self.connection.finish()
if start_read:
self._read_response()
def finish(self):
......
response = HTTPResponse(original_request,
self.code, reason=getattr(self, 'reason', None),
headers=self.headers,
request_time=self.io_loop.time() - self.start_time,
buffer=buffer,
effective_url=self.request.url)
self._run_callback(response)
self._on_end_request()
def _run_callback(self, response):
self._release()
if self.final_callback is not None:
final_callback = self.final_callback
self.final_callback = None
self.io_loop.add_callback(final_callback, response)
通過一系列函式呼叫之後我們終於發現self.io_loop.add_callback(final_callback, response).
以上就是fetch()函式通過一系列執行之後,呼叫了self.io_loop.add_callback,將callcak函式放到了ioloop中,在ioloop迴圈中就會執行callback了. 下面解釋下,什麼時候把fd增加的epoll中的,當fd接收到資料就可以觸發callback了.
tcpclient.py中的TCPClient的connect(上面有程式碼)呼叫了af, addr, stream = yield connector.start(),
tcpclient.py
def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):
self.try_connect(iter(self.primary_addrs))
self.set_timout(timeout)
return self.future
def try_connect(self, addrs):
try:
af, addr = next(addrs)
except StopIteration:
# We've reached the end of our queue, but the other queue
# might still be working. Send a final error on the future
# only when both queues are finished.
if self.remaining == 0 and not self.future.done():
self.future.set_exception(self.last_error or
IOError("connection failed"))
return
future = self.connect(af, addr) #這裡是IOStream的conect
future.add_done_callback(functools.partial(self.on_connect_done,
addrs, af, addr))
try_connect函式中connect呼叫的IOStream中的connect
iostream.py
def connect(self, address, callback=None, server_hostname=None):
"""Connects the socket to a remote address without blocking.
May only be called if the socket passed to the constructor was
not previously connected. The address parameter is in the
same format as for `socket.connect <socket.socket.connect>` for
the type of socket passed to the IOStream constructor,
e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
but will be resolved synchronously and block the IOLoop.
If you have a hostname instead of an IP address, the `.TCPClient`
class is recommended instead of calling this method directly.
`.TCPClient` will do asynchronous DNS resolution and handle
both IPv4 and IPv6.
If ``callback`` is specified, it will be called with no
arguments when the connection is completed; if not this method
returns a `.Future` (whose result after a successful
connection will be the stream itself).
In SSL mode, the ``server_hostname`` parameter will be used
for certificate validation (unless disabled in the
``ssl_options``) and SNI (if supported; requires Python
2.7.9+).
Note that it is safe to call `IOStream.write
<BaseIOStream.write>` while the connection is pending, in
which case the data will be written as soon as the connection
is ready. Calling `IOStream` read methods before the socket is
connected works on some platforms but is non-portable.
.. versionchanged:: 4.0
If no callback is given, returns a `.Future`.
.. versionchanged:: 4.2
SSL certificates are validated by default; pass
``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
suitably-configured `ssl.SSLContext` to the
`SSLIOStream` constructor to disable.
"""
self._connecting = True
if callback is not None:
self._connect_callback = stack_context.wrap(callback)
future = None
else:
future = self._connect_future = TracebackFuture()
try:
self.socket.connect(address)
except socket.error as e:
# In non-blocking mode we expect connect() to raise an
# exception with EINPROGRESS or EWOULDBLOCK.
#
# On freebsd, other errors such as ECONNREFUSED may be
# returned immediately when attempting to connect to
# localhost, so handle them the same way as an error
# reported later in _handle_connect.
if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
if future is None:
gen_log.warning("Connect error on fd %s: %s",
self.socket.fileno(), e)
self.close(exc_info=True)
return future
self._add_io_state(self.io_loop.WRITE)
return future
def _add_io_state(self, state):
"""Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
Implementation notes: Reads and writes have a fast path and a
slow path. The fast path reads synchronously from socket
buffers, while the slow path uses `_add_io_state` to schedule
an IOLoop callback. Note that in both cases, the callback is
run asynchronously with `_run_callback`.
To detect closed connections, we must have called
`_add_io_state` at some point, but we want to delay this as
much as possible so we don't have to set an `IOLoop.ERROR`
listener that will be overwritten by the next slow-path
operation. As long as there are callbacks scheduled for
fast-path ops, those callbacks may do more reads.
If a sequence of fast-path ops do not end in a slow-path op,
(e.g. for an @asynchronous long-poll request), we must add
the error handler. This is done in `_run_callback` and `write`
(since the write callback is optional so we can have a
fast-path write with no `_run_callback`)
"""
if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)
_add_io_state中呼叫了self.io_loop.add_handler,相當於將fd和handler做了繫結,那麼ioloop中的epoll接收到fd傳來的資料,就會觸發handler,hanler經過一系列的執行就會把callback函式加到ioloop中,然後就是callback的執行了.
以上就是從發起請求開始到接收到資料然後執行callback的全過程.