1. 程式人生 > >tornado中epoll的非同步請求的實現過程

tornado中epoll的非同步請求的實現過程

最進看tornado的原始碼,也看了不少tornado的原始碼解析的文章,但是基本都沒有涉及到怎麼通過epoll實現非同步請求的.

iopoll.py

def start(self):
    if self._running:
        raise RuntimeError("IOLoop is already running")
    if self._stopped:
        self._stopped = False
        return
    old_current = getattr(IOLoop._current, "instance", None)
    IOLoop._current.instance = self
    self._running = True

    try:
        while True:
            # Prevent IO event starvation by delaying new callbacks
            # to the next iteration of the event loop.
            with self._callback_lock:
                callbacks = self._callbacks
                self._callbacks = []

            for callback in callbacks:
                self._run_callback(callback)   
            # Closures may be holding on to a lot of memory, so allow
            # them to be freed before we go into our poll wait.
            callbacks = callback = None

            if self._callbacks:
                # If any callbacks or timeouts called add_callback,
                # we don't want to wait in poll() before we run them.
                poll_timeout = 0.0
            else:
                # No timeouts and no callbacks, so use the default.
                poll_timeout = _POLL_TIMEOUT

            if not self._running:
                break
            try:
                event_pairs = self._impl.poll(poll_timeout) #開始執行epoll
            except Exception as e:
                # Depending on python version and IOLoop implementation,
                # different exception types may be thrown and there are
                # two ways EINTR might be signaled:
                # * e.errno == errno.EINTR
                # * e.args is like (errno.EINTR, 'Interrupted system call')
                if errno_from_exception(e) == errno.EINTR: #poll_timeout超時之後就會丟擲這個異常
                    continue 
                else:
                    raise

            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to
            # this IOLoop that modify self._events
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    fd_obj, handler_func = self._handlers[fd]
                    handler_func(fd_obj, events)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        self.handle_callback_exception(self._handlers.get(fd))
                except Exception:
                    self.handle_callback_exception(self._handlers.get(fd))
            fd_obj = handler_func = None

這是精簡後的ioloop的start程式碼,去掉了timeout部分的處理. 從程式碼中可以看到在每次epoll迴圈中都會去調取self._run_callback(callback) ,執行callbak函式. 下面會解析tornado中是如何知道請求執行結束然後新增callback的.

httpclient.py

def fetch(self, request, callback=None, raise_error=True, **kwargs):
    request.headers = httputil.HTTPHeaders(request.headers)
    request = _RequestProxy(request, self.defaults)
    future = TracebackFuture()
    if callback is not None:
        callback = stack_context.wrap(callback)

        def handle_future(future):
            exc = future.exception()
            if isinstance(exc, HTTPError) and exc.response is not None:
                response = exc.response
            elif exc is not None:
                response = HTTPResponse(
                    request, 599, error=exc,
                    request_time=time.time() - request.start_time)
            else:
                response = future.result()
            self.io_loop.add_callback(callback, response)
        future.add_done_callback(handle_future)

    def handle_response(response):
        if raise_error and response.error:
            future.set_exception(response.error)
        else:
            future.set_result(response)
    self.fetch_impl(request, handle_response)
    return future

fetch()就是我們在tornado中常用的非同步請求的函式. 首先是 future.add_done_callback(handle_future), handler_future是將callback和responese作為self.io_loop.add_callback的引數,也就是把callback和response放到io_loop中; 將handle_future作為future的done之後的callback函式,也就是future.add_done_callback(handle_future). 然後self.fetch_impl(request, handle_response), handle_response就是給futrue賦result,之後就會呼叫future的done_callback了. fetch_impl函式在simple_httpclient.py中.

simple_httpclient.py

  def fetch_impl(self, request, callback):
    key = object()
    self.queue.append((key, request, callback))
    if not len(self.active) < self.max_clients:
        timeout_handle = self.io_loop.add_timeout(
            self.io_loop.time() + min(request.connect_timeout,
                                      request.request_timeout),
            functools.partial(self._on_timeout, key, "in request queue"))
    else:
        timeout_handle = None
    self.waiting[key] = (request, callback, timeout_handle)
    self._process_queue()
    if self.queue:
        gen_log.debug("max_clients limit reached, request queued. "
                      "%d active, %d queued requests." % (
                          len(self.active), len(self.queue)))

 def _process_queue(self):
    with stack_context.NullContext():
        while self.queue and len(self.active) < self.max_clients:
            key, request, callback = self.queue.popleft()
            if key not in self.waiting:
                continue
            self._remove_timeout(key)
            self.active[key] = (request, callback)
            release_callback = functools.partial(self._release_fetch, key)
            self._handle_request(request, release_callback, callback)
def _connection_class(self):
    return _HTTPConnection

def _handle_request(self, request, release_callback, final_callback):
    self._connection_class()(
        self.io_loop, self, request, release_callback,
        final_callback, self.max_buffer_size, self.tcp_client,
        self.max_header_size, self.max_body_size)

通過程式碼可以找到函式的呼叫關係.

class _HTTPConnection(httputil.HTTPMessageDelegate):
_SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])

def __init__(self, io_loop, client, request, release_callback,
             final_callback, max_buffer_size, tcp_client,
             max_header_size, max_body_size):
        #tcpclient中這個函式被@coroutine,函式本身沒有callback,但是coroutine會利用callback
        self.tcp_client.connect(host, port, af=af,
                                ssl_options=ssl_options,
                                max_buffer_size=self.max_buffer_size,
                                callback=self._on_connect)

tcpclient.py

@gen.coroutine
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
            max_buffer_size=None):
    """Connect to the given host and port.

    Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
    ``ssl_options`` is not None).
    """
    addrinfo = yield self.resolver.resolve(host, port, af)
    connector = _Connector(
        addrinfo, self.io_loop,
        functools.partial(self._create_stream, max_buffer_size))
    af, addr, stream = yield connector.start()
    # TODO: For better performance we could cache the (af, addr)
    # information here and re-use it on subsequent connections to
    # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
    if ssl_options is not None:
        stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                        server_hostname=host)
    raise gen.Return(stream)

上面是精簡之後的_HTTPConnection的程式碼,通過呼叫tcpclient的connect實現了callback的回撥. 但是我們發現connect函式沒有callback的引數,其實個函式被@coroutine,函式本身沒有callback,但是coroutine會利用callback,以後的文章會解析gen和coroutine,這裡就不做進一步的解釋了. self._on_connect這個callback函式被呼叫,

simple_httpclient.py

 def _on_connect(self, stream):
    if self.final_callback is None:
        # final_callback is cleared if we've hit our timeout.
        stream.close()
        return
      ........
    if self.request.expect_100_continue:
        self._read_response()
    else:
        self._write_body(True)

 def _write_body(self, start_read):
     .......
    self.connection.finish()
    if start_read:
        self._read_response()

def finish(self):
   ......
    response = HTTPResponse(original_request,
                            self.code, reason=getattr(self, 'reason', None),
                            headers=self.headers,
                            request_time=self.io_loop.time() - self.start_time,
                            buffer=buffer,
                            effective_url=self.request.url)
    self._run_callback(response)
    self._on_end_request()

def _run_callback(self, response):
    self._release()
    if self.final_callback is not None:
        final_callback = self.final_callback
        self.final_callback = None
        self.io_loop.add_callback(final_callback, response)

通過一系列函式呼叫之後我們終於發現self.io_loop.add_callback(final_callback, response).
以上就是fetch()函式通過一系列執行之後,呼叫了self.io_loop.add_callback,將callcak函式放到了ioloop中,在ioloop迴圈中就會執行callback了. 下面解釋下,什麼時候把fd增加的epoll中的,當fd接收到資料就可以觸發callback了.
tcpclient.py中的TCPClient的connect(上面有程式碼)呼叫了af, addr, stream = yield connector.start(),

tcpclient.py

    def start(self, timeout=_INITIAL_CONNECT_TIMEOUT):
    self.try_connect(iter(self.primary_addrs))
    self.set_timout(timeout)
    return self.future

def try_connect(self, addrs):
    try:
        af, addr = next(addrs)
    except StopIteration:
        # We've reached the end of our queue, but the other queue
        # might still be working.  Send a final error on the future
        # only when both queues are finished.
        if self.remaining == 0 and not self.future.done():
            self.future.set_exception(self.last_error or
                                      IOError("connection failed"))
        return
    future = self.connect(af, addr) #這裡是IOStream的conect
    future.add_done_callback(functools.partial(self.on_connect_done,
                                               addrs, af, addr))

try_connect函式中connect呼叫的IOStream中的connect

iostream.py

   def connect(self, address, callback=None, server_hostname=None):
    """Connects the socket to a remote address without blocking.

    May only be called if the socket passed to the constructor was
    not previously connected.  The address parameter is in the
    same format as for `socket.connect <socket.socket.connect>` for
    the type of socket passed to the IOStream constructor,
    e.g. an ``(ip, port)`` tuple.  Hostnames are accepted here,
    but will be resolved synchronously and block the IOLoop.
    If you have a hostname instead of an IP address, the `.TCPClient`
    class is recommended instead of calling this method directly.
    `.TCPClient` will do asynchronous DNS resolution and handle
    both IPv4 and IPv6.

    If ``callback`` is specified, it will be called with no
    arguments when the connection is completed; if not this method
    returns a `.Future` (whose result after a successful
    connection will be the stream itself).

    In SSL mode, the ``server_hostname`` parameter will be used
    for certificate validation (unless disabled in the
    ``ssl_options``) and SNI (if supported; requires Python
    2.7.9+).

    Note that it is safe to call `IOStream.write
    <BaseIOStream.write>` while the connection is pending, in
    which case the data will be written as soon as the connection
    is ready.  Calling `IOStream` read methods before the socket is
    connected works on some platforms but is non-portable.

    .. versionchanged:: 4.0
        If no callback is given, returns a `.Future`.

    .. versionchanged:: 4.2
       SSL certificates are validated by default; pass
       ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
       suitably-configured `ssl.SSLContext` to the
       `SSLIOStream` constructor to disable.
    """
    self._connecting = True
    if callback is not None:
        self._connect_callback = stack_context.wrap(callback)
        future = None
    else:
        future = self._connect_future = TracebackFuture()
    try:
        self.socket.connect(address)
    except socket.error as e:
        # In non-blocking mode we expect connect() to raise an
        # exception with EINPROGRESS or EWOULDBLOCK.
        #
        # On freebsd, other errors such as ECONNREFUSED may be
        # returned immediately when attempting to connect to
        # localhost, so handle them the same way as an error
        # reported later in _handle_connect.
        if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
                errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
            if future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), e)
            self.close(exc_info=True)
            return future
    self._add_io_state(self.io_loop.WRITE)
    return future

  def _add_io_state(self, state):
    """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.

    Implementation notes: Reads and writes have a fast path and a
    slow path.  The fast path reads synchronously from socket
    buffers, while the slow path uses `_add_io_state` to schedule
    an IOLoop callback.  Note that in both cases, the callback is
    run asynchronously with `_run_callback`.

    To detect closed connections, we must have called
    `_add_io_state` at some point, but we want to delay this as
    much as possible so we don't have to set an `IOLoop.ERROR`
    listener that will be overwritten by the next slow-path
    operation.  As long as there are callbacks scheduled for
    fast-path ops, those callbacks may do more reads.
    If a sequence of fast-path ops do not end in a slow-path op,
    (e.g. for an @asynchronous long-poll request), we must add
    the error handler.  This is done in `_run_callback` and `write`
    (since the write callback is optional so we can have a
    fast-path write with no `_run_callback`)
    """
    if self.closed():
        # connection has been closed, so there can be no future events
        return
    if self._state is None:
        self._state = ioloop.IOLoop.ERROR | state
        with stack_context.NullContext():
            self.io_loop.add_handler(
                self.fileno(), self._handle_events, self._state)
    elif not self._state & state:
        self._state = self._state | state
        self.io_loop.update_handler(self.fileno(), self._state)

_add_io_state中呼叫了self.io_loop.add_handler,相當於將fd和handler做了繫結,那麼ioloop中的epoll接收到fd傳來的資料,就會觸發handler,hanler經過一系列的執行就會把callback函式加到ioloop中,然後就是callback的執行了.

以上就是從發起請求開始到接收到資料然後執行callback的全過程.