[gevent原始碼分析] c-ares非同步DNS請求
c-ares是非同步DNS請求庫,libcurl,libevent,wireshark都使用了c-ares,gevent1.0版本前使用的是libevent,
所以它的DNS請求也是使用c-ares,1.0版本後使用cython封裝了c-ares。
gevent中DNS預設使用的是執行緒池版本的,可通過設定GEVENT_RESOLVER=ares環境變數使用c-ares非同步庫。
如何證明的確是非同步呢,試著跑一遍你就知道了?
在開始之前先講幾個c-ares庫函式,熟悉之後再看ares.pyx會有種和親切的感覺。#coding=utf8 import socket import gevent from gevent import get_hub from gevent.resolver_ares import Resolver r = get_hub().resolver = Resolver(servers=['8.8.8.8']) def f(w): print w,r.gethostbyname(w) for w in ['www.google.com','www.baidu.com','www.apple.com']: gevent.spawn(f,w) gevent.sleep(6)
cares.ares_library_init(cares.ARES_LIB_INIT_ALL)
初始化ares庫,其實只對windows平臺做了處理,主要是為了載入iphlpapi.dll,在非windows平臺可不呼叫。
如果呼叫一定要在c-ares任何函式之前呼叫。
cares.ares_library_cleanup()
相對於cares.ares_library_init,在windows平臺將釋放iphlpapi.dll,非windows平臺可不呼叫。
gevent中並沒有呼叫該函式,作者在__dealloc__中也用?號表明了這一點,我不太理解,可能有更好的理由吧。
cares.ares_destroy(self.channel)
銷燬channel,釋放記憶體,關閉開啟的socket,這是在__dealloc__中呼叫
cares.ares_init_options(&channel, &options, optmask)
這是ares中最核心的函式,用於初始化channel,options,optmask主要是通過channel的__init__構造
cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresChannel_Type]: def __init__(self, object loop, flags=None, timeout=None, tries=None, ndots=None, udp_port=None, tcp_port=None, servers=None):
引數說明:
flags用於控制一查詢行為,如ARES_FLAG_USEVC,將只發送TCP請求(我們知道DNS既有TCP也有UDP)
ARES_FLAG_PRIMARY :只向第一個伺服器傳送請求,還有其它選項參考ares_init_options函式文件
timeout:指明第一次請求的超時時間,單位為秒,c-ares單位為毫秒,gevent會轉換,第一次之後的超時c-area有它自己的演算法
tries:請求嘗試次數,預設4次
ndots:最少'.'的數量,預設是1,如果大於1,就直接查詢域名,不然會和本地域名合併(init_by_environment設定本地域名)
udp_port,tcp_port:使用的udp,tcp埠號,預設53
servers:傳送dns請求的伺服器,見下面ares_set_servers
ndots多說一句,比如ping bifeng(這是我一同事的主機),檢測發現沒有'.'(也就是小於ndots),所以會把本地域給加上去該操作在ares_search.c中ares_search函式中
cares.ares_set_servers(self.channel, cares.ares_addr_node* c_servers)
設定dns請求伺服器,設定完成需要free掉c_servers的記憶體空間,因為ares_set_servers中重新malloc記憶體空間了。
在set_servers中,通過finally free記憶體空間
c_servers = <cares.ares_addr_node*>malloc(sizeof(cares.ares_addr_node) * length)
if not c_servers:
raise MemoryError
try:
index = 0
for server in servers:
...
c_servers[length - 1].next = NULL
index = cares.ares_set_servers(self.channel, c_servers)
if index:
raise ValueError(strerror(index))
finally:
free(c_servers)
你可能很好奇,c-ares是如何和gevent(libev)的socket關聯起來的,因為DNS的本質也是
socket請求,所以底層也是需要使用作業系統提供的epoll等機制,而c-ares提供了socket狀態變化的介面,
這就可以讓c-ares執行在libev上面,所有的魔法其實都是ares_options.sock_state_cb向外提供的。
#ares.h
struct ares_options {
int flags;
int timeout; /* in seconds or milliseconds, depending on options */
int tries;
....
ares_sock_state_cb sock_state_cb;
void *sock_state_cb_data;
};
ARES_OPT_SOCK_STATE_CB void (*sock_state_cb)(void *data, int s, int read, int write)
當dns socket狀態改變時將回調sock_state_cb,而在channel的__init__中將sock_state_cb設定為gevent_sock_state_callbackdef __init__(...)
options.sock_state_cb = <void*>gevent_sock_state_callback
options.sock_state_cb_data = <void*>self
cdef void gevent_sock_state_callback(void *data, int s, int read, int write):
if not data:
return
cdef channel ch = <channel>data
ch._sock_state_callback(s, read, write)
gevent_sock_state_callback只做了一件事就是呼叫channel的_sock_state_callback,並設定是讀是寫 cdef _sock_state_callback(self, int socket, int read, int write):
if not self.channel:
return
cdef object watcher = self._watchers.get(socket)
cdef int events = 0
if read:
events |= EV_READ
if write:
events |= EV_WRITE
if watcher is None:
if not events:
return
watcher = self.loop.io(socket, events) #socket第一次,啟動io watcher
self._watchers[socket] = watcher
elif events: #已有watcher,判斷事件是否變化了
if watcher.events == events:
return
watcher.stop()
watcher.events = events #設定新狀態
else:
watcher.stop()
self._watchers.pop(socket, None)
if not self._watchers:
self._timer.stop()
return #沒有事件了,也就是都處理完了,將回調我們的最終回撥函式(如呼叫gethostbyname時設定的回撥)
watcher.start(self._process_fd, watcher, pass_events=True) #watcher設定回撥
self._timer.again(self._on_timer) #讓c-ares每秒處理一下超時和broken_connections
前面io wather的回撥self._process_fd主要就是呼叫cares.ares_process_fd對指定的檔案描述符繼續處理,cares.ARES_SOCKET_BAD代表該事件不做處理,其實也就是該事件已經處理完了。
def _process_fd(self, int events, object watcher):
if not self.channel:
return
cdef int read_fd = watcher.fd #只處理的檔案描述符
cdef int write_fd = read_fd
if not (events & EV_READ): #沒有可讀事件,將讀fd設為"不處理"
read_fd = cares.ARES_SOCKET_BAD
if not (events & EV_WRITE): #沒有可寫事件,將寫fd設為"不處理"
write_fd = cares.ARES_SOCKET_BAD
cares.ares_process_fd(self.channel, read_fd, write_fd)
其實到上面c-ares流程已經差不多了,最後會回撥設定的最終回撥,我們來看一下gethostbyname的操作
定義於resolver_ares.py的gethostbyname函式,呼叫的是gethostbyname_ex
def gethostbyname_ex(self, hostname, family=AF_INET):
while True:
ares = self.ares
try:
waiter = Waiter(self.hub) #使用Waiter
ares.gethostbyname(waiter, hostname, family) #呼叫ares.gethostbyname,設定回撥為waiter
result = waiter.get() #我們知道,waiter沒有結果時會切換到hub,完美的和gevent結合起來
if not result[-1]:
raise gaierror(-5, 'No address associated with hostname')
return result
except gaierror:
if ares is self.ares:
raise
Waiter定義了__call__方法,所以可以直接作為回撥函式
ares.gethostbyname主要就是呼叫了cares.ares_gethostbyname(self.channel, name, family, <void*>gevent_ares_host_callback, <void*>arg)
當DNS請求成功或失敗都會回撥gevent_ares_host_callback
而gevent_ares_host_callback會回撥上面的waiter,並把結果傳給waiter,這邊可以自己看下程式碼,比較簡單。
waiter.__call__會switch到之前切換的greenlet,即前面的waiter.get()處,此時將返回result,gethostbyname成功執行。
這裡還有一個問題,c-ares什麼時候認為socket的狀態改變了?
#define SOCK_STATE_CALLBACK(c, s, r, w) \
do { \
if ((c)->sock_state_cb) \
(c)->sock_state_cb((c)->sock_state_cb_data, (s), (r), (w)); \
} WHILE_FALSE
在c-ares中狀態改變回調是通過SOCK_STATE_CALLBACK巨集實現的,我們可以搜尋一下這個巨集你就明白了。我們可以看一下open_tcp_socket,這是在剛開始傳送tcp時呼叫的。
static int open_tcp_socket(ares_channel channel, struct server_state *server)
{
......
/* Acquire a socket. */
s = socket(server->addr.family, SOCK_STREAM, 0); //建立socket
/* Configure it. */
configure_socket(s, server->addr.family, channel); //配置
#ifdef TCP_NODELAY
/*
* Disable the Nagle algorithm (only relevant for TCP sockets, and thus not
* in configure_socket). In general, in DNS lookups we're pretty much
* interested in firing off a single request and then waiting for a reply,
* so batching isn't very interesting.
*/
opt = 1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
(void *)&opt, sizeof(opt)) == -1) //判斷是否使用TCP_NODELAY
{
sclose(s);
return -1;
}
#endif
/* Connect to the server. */
if (connect(s, sa, salen) == -1) // 連線DNS伺服器
{
int err = SOCKERRNO;
if (err != EINPROGRESS && err != EWOULDBLOCK)
{
sclose(s);
return -1;
}
}
SOCK_STATE_CALLBACK(channel, s, 1, 0); // 連線後,狀態肯定改變,肯定有讀事件,所有read_fd設為1,自然地呼叫了狀態改變函式
server->tcp_buffer_pos = 0;
server->tcp_socket = s;
server->tcp_connection_generation = ++channel->tcp_connection_generation;
return 0;
}
也就是說c-ares值關注剛開始的狀態變化,也就是連線後“讀”事件,中間的狀態改變就全部交給gevent了。當然當查詢結束,或area channel被destory或cancel時,你還需要告訴gevent已經沒有關注事件了,這個是在
ares__close_sockets函式中實現的。
c-ares真的很美,僅僅通過提供幾個介面,就可以讓自己和其它的框架完美結合,very nice!!!
我之前就是很好奇c-ares的執行方式,內部DNS細節可能並不關注,關注的就是結合問題,花了不少時間研究,
主要是我在網上找不到c-ares的example,這讓我鬱悶了半天,這麼使用廣泛的庫怎麼沒有人研究呢?