1. 程式人生 > >[gevent原始碼分析] c-ares非同步DNS請求

[gevent原始碼分析] c-ares非同步DNS請求

c-ares是非同步DNS請求庫,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent,

所以它的DNS請求也是使用c-ares,1.0版本後使用cython封裝了c-ares。

gevent中DNS預設使用的是執行緒池版本的,可通過設定GEVENT_RESOLVER=ares環境變數使用c-ares非同步庫。



如何證明的確是非同步呢,試著跑一遍你就知道了?

#coding=utf8
import socket
import gevent
from gevent import get_hub
from gevent.resolver_ares import Resolver
r = get_hub().resolver = Resolver(servers=['8.8.8.8'])

def f(w):
    print w,r.gethostbyname(w)
for w in ['www.google.com','www.baidu.com','www.apple.com']:
    gevent.spawn(f,w)

gevent.sleep(6)
在開始之前先講幾個c-ares庫函式,熟悉之後再看ares.pyx會有種和親切的感覺。
cares.ares_library_init(cares.ARES_LIB_INIT_ALL)
初始化ares庫,其實只對windows平臺做了處理,主要是為了載入iphlpapi.dll,在非windows平臺可不呼叫。
如果呼叫一定要在c-ares任何函式之前呼叫。

cares.ares_library_cleanup()
相對於cares.ares_library_init,在windows平臺將釋放iphlpapi.dll,非windows平臺可不呼叫。
gevent中並沒有呼叫該函式,作者在__dealloc__中也用?號表明了這一點,我不太理解,可能有更好的理由吧。

cares.ares_destroy(self.channel)
銷燬channel,釋放記憶體,關閉開啟的socket,這是在__dealloc__中呼叫

cares.ares_init_options(&channel, &options, optmask)
這是ares中最核心的函式,用於初始化channel,options,optmask主要是通過channel的__init__構造

cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresChannel_Type]:
    def __init__(self, object loop, flags=None, timeout=None, tries=None, ndots=None,
                 udp_port=None, tcp_port=None, servers=None):

引數說明:

flags用於控制一查詢行為,如ARES_FLAG_USEVC,將只發送TCP請求(我們知道DNS既有TCP也有UDP)
ARES_FLAG_PRIMARY :只向第一個伺服器傳送請求,還有其它選項參考ares_init_options函式文件
timeout:指明第一次請求的超時時間,單位為秒,c-ares單位為毫秒,gevent會轉換,第一次之後的超時c-area有它自己的演算法
tries:請求嘗試次數,預設4次
ndots:最少'.'的數量,預設是1,如果大於1,就直接查詢域名,不然會和本地域名合併(init_by_environment設定本地域名)
udp_port,tcp_port:使用的udp,tcp埠號,預設53
servers:傳送dns請求的伺服器,見下面ares_set_servers
ndots多說一句,比如ping bifeng(這是我一同事的主機),檢測發現沒有'.'(也就是小於ndots),所以會把本地域給加上去該操作在ares_search.c中ares_search函式中


cares.ares_set_servers(self.channel, cares.ares_addr_node* c_servers)
設定dns請求伺服器,設定完成需要free掉c_servers的記憶體空間,因為ares_set_servers中重新malloc記憶體空間了。
在set_servers中,通過finally free記憶體空間

            c_servers = <cares.ares_addr_node*>malloc(sizeof(cares.ares_addr_node) * length)
            if not c_servers:
                raise MemoryError
            try:
                index = 0
                for server in servers:
                    ...
                c_servers[length - 1].next = NULL
                index = cares.ares_set_servers(self.channel, c_servers)
                if index:
                    raise ValueError(strerror(index))
            finally:
                free(c_servers)

你可能很好奇,c-ares是如何和gevent(libev)的socket關聯起來的,因為DNS的本質也是
socket請求,所以底層也是需要使用作業系統提供的epoll等機制,而c-ares提供了socket狀態變化的介面,
這就可以讓c-ares執行在libev上面,所有的魔法其實都是ares_options.sock_state_cb向外提供的。

#ares.h
struct ares_options {
  int flags;
  int timeout; /* in seconds or milliseconds, depending on options */
  int tries;
  ....
  ares_sock_state_cb sock_state_cb;
  void *sock_state_cb_data;
};
ARES_OPT_SOCK_STATE_CB void (*sock_state_cb)(void *data, int s, int read, int write)
當dns socket狀態改變時將回調sock_state_cb,而在channel的__init__中將sock_state_cb設定為gevent_sock_state_callback
def __init__(...)
    options.sock_state_cb = <void*>gevent_sock_state_callback
    options.sock_state_cb_data = <void*>self

cdef void gevent_sock_state_callback(void *data, int s, int read, int write):
    if not data:
        return
    cdef channel ch = <channel>data
    ch._sock_state_callback(s, read, write)
gevent_sock_state_callback只做了一件事就是呼叫channel的_sock_state_callback,並設定是讀是寫
    cdef _sock_state_callback(self, int socket, int read, int write):
        if not self.channel:
            return
        cdef object watcher = self._watchers.get(socket)
        cdef int events = 0
        if read:
            events |= EV_READ
        if write:
            events |= EV_WRITE
        if watcher is None:
            if not events:
                return
            watcher = self.loop.io(socket, events) #socket第一次,啟動io watcher
            self._watchers[socket] = watcher
        elif events: #已有watcher,判斷事件是否變化了
            if watcher.events == events:
                return
            watcher.stop()
            watcher.events = events #設定新狀態
        else:
            watcher.stop()
            self._watchers.pop(socket, None)
            if not self._watchers:
                self._timer.stop()
            return #沒有事件了,也就是都處理完了,將回調我們的最終回撥函式(如呼叫gethostbyname時設定的回撥)
        watcher.start(self._process_fd, watcher, pass_events=True) #watcher設定回撥
        self._timer.again(self._on_timer) #讓c-ares每秒處理一下超時和broken_connections
前面io wather的回撥self._process_fd主要就是呼叫cares.ares_process_fd對指定的檔案描述符繼續處理,
cares.ARES_SOCKET_BAD代表該事件不做處理,其實也就是該事件已經處理完了。
    def _process_fd(self, int events, object watcher):
        if not self.channel:
            return
        cdef int read_fd = watcher.fd #只處理的檔案描述符
        cdef int write_fd = read_fd
        if not (events & EV_READ): #沒有可讀事件,將讀fd設為"不處理"
            read_fd = cares.ARES_SOCKET_BAD
        if not (events & EV_WRITE): #沒有可寫事件,將寫fd設為"不處理"
            write_fd = cares.ARES_SOCKET_BAD
        cares.ares_process_fd(self.channel, read_fd, write_fd)

其實到上面c-ares流程已經差不多了,最後會回撥設定的最終回撥,我們來看一下gethostbyname的操作
定義於resolver_ares.py的gethostbyname函式,呼叫的是gethostbyname_ex

   def gethostbyname_ex(self, hostname, family=AF_INET):
        while True:
            ares = self.ares
            try:
                waiter = Waiter(self.hub) #使用Waiter
                ares.gethostbyname(waiter, hostname, family) #呼叫ares.gethostbyname,設定回撥為waiter
                result = waiter.get() #我們知道,waiter沒有結果時會切換到hub,完美的和gevent結合起來
                if not result[-1]:
                    raise gaierror(-5, 'No address associated with hostname')
                return result
            except gaierror:
                if ares is self.ares:
                    raise

Waiter定義了__call__方法,所以可以直接作為回撥函式
ares.gethostbyname主要就是呼叫了cares.ares_gethostbyname(self.channel, name, family, <void*>gevent_ares_host_callback, <void*>arg)
當DNS請求成功或失敗都會回撥gevent_ares_host_callback
而gevent_ares_host_callback會回撥上面的waiter,並把結果傳給waiter,這邊可以自己看下程式碼,比較簡單。

waiter.__call__會switch到之前切換的greenlet,即前面的waiter.get()處,此時將返回result,gethostbyname成功執行。

這裡還有一個問題,c-ares什麼時候認為socket的狀態改變了?

#define SOCK_STATE_CALLBACK(c, s, r, w)                                 \
  do {                                                                  \
    if ((c)->sock_state_cb)                                             \
      (c)->sock_state_cb((c)->sock_state_cb_data, (s), (r), (w));       \
  } WHILE_FALSE
在c-ares中狀態改變回調是通過SOCK_STATE_CALLBACK巨集實現的,我們可以搜尋一下這個巨集你就明白了。

我們可以看一下open_tcp_socket,這是在剛開始傳送tcp時呼叫的。

static int open_tcp_socket(ares_channel channel, struct server_state *server)
{
  ......
  /* Acquire a socket. */
  s = socket(server->addr.family, SOCK_STREAM, 0);  //建立socket
  
  /* Configure it. */
 configure_socket(s, server->addr.family, channel); //配置
 

#ifdef TCP_NODELAY
  /*
   * Disable the Nagle algorithm (only relevant for TCP sockets, and thus not
   * in configure_socket). In general, in DNS lookups we're pretty much
   * interested in firing off a single request and then waiting for a reply,
   * so batching isn't very interesting.
   */
  opt = 1;
  if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
                 (void *)&opt, sizeof(opt)) == -1) //判斷是否使用TCP_NODELAY
    {
       sclose(s);
       return -1;
    }
#endif

  /* Connect to the server. */
  if (connect(s, sa, salen) == -1)  // 連線DNS伺服器
    {
      int err = SOCKERRNO;

      if (err != EINPROGRESS && err != EWOULDBLOCK)
        {
          sclose(s);
          return -1;
        }
    }

  SOCK_STATE_CALLBACK(channel, s, 1, 0); // 連線後,狀態肯定改變,肯定有讀事件,所有read_fd設為1,自然地呼叫了狀態改變函式
  server->tcp_buffer_pos = 0;
  server->tcp_socket = s;
  server->tcp_connection_generation = ++channel->tcp_connection_generation;
  return 0;
}
也就是說c-ares值關注剛開始的狀態變化,也就是連線後“讀”事件,中間的狀態改變就全部交給gevent了。

當然當查詢結束,或area channel被destory或cancel時,你還需要告訴gevent已經沒有關注事件了,這個是在

ares__close_sockets函式中實現的。


c-ares真的很美,僅僅通過提供幾個介面,就可以讓自己和其它的框架完美結合,very nice!!!

我之前就是很好奇c-ares的執行方式,內部DNS細節可能並不關注,關注的就是結合問題,花了不少時間研究,

主要是我在網上找不到c-ares的example,這讓我鬱悶了半天,這麼使用廣泛的庫怎麼沒有人研究呢?