1. 程式人生 > 實用技巧 >用C語言擼了個DBProxy

用C語言擼了個DBProxy

用C語言擼了個DBProxy

前言

筆者在閱讀了一大堆原始碼後,就會情不自禁產生造輪子的想法。於是花了數個週末的時間用C語言擼了一個DBProxy(MySQL協議)。在筆者的github中給這個DBProxy起名為Hero。

為什麼採用C語言

筆者一直有C情節,求學時候一直玩C。工作之後,一直使用Java,就把C漸漸放下了。在筆者最近一年閱讀了一堆關於linux Kernel(C)和MySQL(C++)的原始碼後,就萌生了重拾C的想法。同時用純C的話,勢必要從基礎開始造一大堆輪子,這也符合筆者當時造輪子的心境。

造了哪些輪子

習慣了Java的各種好用的類庫框架之後,用純C無疑就是找虐。不過,既然做了這個決定,跪著也得搞完。在寫Hero的過程中。很大一部分時間就是在搭建基礎工具,例如:

Reactor模型
記憶體池
packet_buffer
協議分包處理
連線池
......

下面在這篇部落格裡面一一道來

DBProxy的整體原理

Hero(DBProxy)其實就是自己偽裝成MySQL,接收到應用發過來的SQL命令後,再轉發到後端。如下圖所示:
由於Hero在解析SQL的時候,可以獲取各種資訊,例如事務這個資訊就可以通過set auto_commit和begin等命令存在連線狀態裡面,再根據解析出來的SQL判斷其是否需要走主庫。這樣就可以對應用透明的進行主從分離以至於分庫分表等操作。
當然了,筆者現在的Hero剛把基礎的功能搭建好(協議、連線池等),對連線狀態還沒有做進一步的處理。

Reactor模式

Hero的網路模型採用了Reactor模式,而且是多執行緒模型,同時採用epoll的水平觸發。

採用多執行緒模型

為什麼採用多執行緒,純粹是為了編寫程式碼簡單。多程序的話,還得考慮worker程序間負載均衡問題,例如nginx就在某個worker程序達到7/8最大連線數的時候拒絕獲取連線從而轉給其它worker。多執行緒的話,在accept執行緒裡面通過取模選擇一個worker執行緒就可以輕鬆的達到簡單的負載均衡結果。

採用epoll水平觸發

為什麼採用epoll的水平觸發,純粹也是為了編寫程式碼簡單。如果採用邊緣觸發的話,需要迴圈讀取直到read返回位元組數為0為止。然而如果某個連線特別活躍,socket的資料一直讀不完,會造成其它連線飢餓,所以必須還得自己寫個均衡演算法,在讀到一定程度後,去選擇其它連線。

Reactor

整體Reactor模型如下圖所示:

其實程式碼是很簡單的,如下面程式碼所示的就是reactor中的accept處理:

// 中間省略了大量的錯誤處理
int init_reactor(int listen_fd,int worker_count){
    // 注意,這邊需要是unsigned 防止出現負數
    unsigned int current_worker = 0;
    for(;;){
        int numevents = 0;
        int retval = epoll_wait(reactor->master_fd,reactor->events,EPOLL_MAX_EVENTS,500);
        ......
        for(j=0; j < numevents; j++){        
			client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_len))
			poll_add_event(reactor->worker_fd_arrays[current_worker++%reactor->worker_count],conn->sockfd,EPOLLOUT,conn)
			......     
		 }   
}		 

上面程式碼中,每來一個新的連線current_worker都自增,這樣取模後便可在連線層面上對worker執行緒進行負載均衡。 而worker執行緒則通過pthread去實現,如下面程式碼所示:

// 這裡的worker_count根據呼叫get_nprocs得到的對應機器的CPU數量
// 注意,由於docker返回的是宿主機CPU數量,所以需要自行調整
    for(int i=0;i<worker_count;i++){
        reactor->worker_fd_arrays[i] = epoll_create(EPOLL_MAX_EVENTS);
        if(reactor->worker_fd_arrays[i] == -1){
           goto error_process;
        }
        // 通過pthread去建立worker執行緒
        if(FALSE == create_and_start_rw_thread(reactor->worker_fd_arrays[i],pool)){
            goto error_process;
        }
    }

而worker執行緒的處理也是按照標準的epoll水平觸發去處理的:

static void* rw_thread_func(void* arg){
    ......
    for(;;){
        int numevents = 0;
        int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);
        ......
        for(j=0; j < numevents; j++){
        	if(event & EPOLLHUP){
        		// 處理斷開事件
                ......
        	}else if(event & EPOLLERR){
        		// 處理錯誤事件
                ......
        	}
        	
        }else {
            if(event & EPOLLIN){
                handle_ready_read_connection(conn);
                continue;
            }
            if(event & EPOLLOUT){
                hanlde_ready_write_connection(conn);
                continue;
            }
        }
    }
    ......
}

記憶體池

為什麼需要記憶體池

事實上,筆者在最開始編寫的時候,是直接呼叫標準庫裡面的malloc的。但寫著寫著,就發現一些非常坑的問題。例如我要在一個請求裡面malloc數十個結構,同時每次malloc都有失敗的可能,那麼我的程式碼可能寫這樣。

void do_something(){
	void * a1 = (void*)malloc(sizeof(struct A));
	if(a1 == NULL){
		goto error_process;
	}
	......	
	void * a10 = (void*)malloc(sizeof(struct A));
	if(a10 == NULL{
		goto error_process;
	}
error_process:
	if(NULL != a1){
		free(a1);
	}	
	......
	if(NULL != a10){
		free(a10);
	}
}	

寫著寫著,筆者就感覺完全不可控制了。尤其是在各種條件分支加進去之後,可能本身aN這個變數都還沒有被分配,那麼就還不能用(NULL != aN),還得又一堆複雜的判斷。 有了記憶體池之後,雖然依舊需要仔細判斷記憶體池不夠的情況,但至少free的時候,只需要把記憶體池整體給free掉即可,如下面程式碼所示:

void do_something(mem_pool* pool){
	void * a1 = (void*)mem_pool_alloc(sizeof(struct A),pool);
	if(a1 == NULL){
		goto error_process;
	}
	......	
	void * a10 = (void*)mem_pool_alloc(sizeof(struct A),pool);
	if(a10 == NULL{
		goto error_process;
	}
error_process:
	// 直接一把全部釋放
	mem_pool_free(pool);		
}

記憶體池的設計

為了編寫程式碼簡單,筆者採用了比較簡單的設計,如下圖所示:

這種記憶體池的好處就在於分配很多小物件的時候,不必一一去清理,直接連整個記憶體池都重置即可。而且由於每次free的基本都是mem_block大小,所以產生的記憶體碎片也少。不足之處就在於,一些可以被立即銷燬的物件只能在最後重置記憶體池的時候才銷燬。但如果都是小物件的話,影響不大。

記憶體池的分配優化

考慮到記憶體對齊,每次申請記憶體的時候都按照sizeof(union hero_aligin)進行最小記憶體分配。這是那本採用<<c interface and implemention>>的推薦的對齊大小。

採用<<c interface and implemention>>的實現
union hero_align{
    int i;
    long l;
    long *lp;
    void *p;
    void (*fp)(void);
    float f;
    double d;
    long double ld;
};

真正的對齊則是參照nginx的寫法:

size = (size + sizeof(union hero_align) - 1) & (~(sizeof(union hero_align) - 1));

其實筆者一開始還打算做一下類似linux kernel SLAB快取中的隨機著色機制,這樣能夠緩解false sharing問題,想想為了程式碼簡單,還是算了。

為什麼需要packet_buffer

packet_buffer是用來儲存從socket fd中讀取或寫入的資料。 設計packet_buffer的初衷就是要重用記憶體,因為一個連線反覆的去獲取寫入資料總歸是需要記憶體的,只在連線初始化的時候去分配一次記憶體顯然是比反覆分配再銷燬效率高。

為什麼不直接用記憶體池

上文中說到,銷燬記憶體必須將池裡面的整個資料重置。如果packet_buffer和其它的資料結構在同一記憶體池中分配,要重用它,那麼在packet_buffer之前分配的資料就不能被清理。如下圖所示:

這樣顯然違背了筆者設計記憶體池是為了釋放記憶體方便的初衷。
另外,如果使用記憶體池,那麼從sockfd中讀取/寫入的資料就可能從連續的變成一個一個mem_block分離的資料,這種不連續性對資料包的處理會特別麻煩。如下圖所示:

當然了,這種非連續的分配方式,筆者曾經在閱讀lwip協議時見過(幫某實時作業系統處理一個詭異的bug),lwip在嵌入式這種記憶體稀缺的環境中使用這種方式從而儘量避免大記憶體的分配。 所以筆者採取了在連線建立時刻一次性分配一個比較大的記憶體來處理單個請求的資料,如果這個記憶體也滿足不了,則realloc(當然了realloc也有坑,需要仔細編寫)。

packet_buffer結構


packet_buffer這種動態改變大小而且地址上連續的結構為處理包結構提供了便利。
由於realloc的時候,packet_buffer->buffer本身指向的地址可能會變,所以儘量避免直接操作內部buffer,非得使用內部buffer的時候,不能將其賦予一個區域性變數,否則buffer變化,區域性變數可能指向了之前廢棄的buffer。

MySQL協議分包處理

MySQL協議基於tcp(當然也有unix域協議,這裡只考慮tcp)。同時Hero採用的是非阻塞IO模式,讀取包時,recv系統呼叫可能在包的任意位元位置上返回。這時候,就需要仔細的處理分包。

MySQL協議外層格式

MySQL協議是通過在幀頭部加上length field的設計來處理分包問題。如下圖所示:

Hero的處理

Hero對於length field採用狀態機進行處理,這也是通用的手法。首先讀取3byte+1byte的packet_length和sequenceId,然後再通過packet_length讀取剩下的body長度。如下圖所示:

連線池

Hero的連線池造的還比較粗糙,事實上就是一個數組,通過mutex鎖來控制併發的put/get
連線管理部分尚未開發。

MySQL協議格式處理

相較於前面的各種輪子,MySQL協議本身反倒顯得輕鬆許多,唯一複雜的地方在於握手階段的加解密過程,但是MySQL是開源的,筆者直接將MySQL本身對於握手加解密的程式碼copy過來就行了。以下程式碼copy自MySQL-5.1.59(密碼學太高深,這個輪子不造也罷):

// 摘自MySQL-5.1.59,用作password的加解密
void scramble(char *to, const char *message, const char *password) {

  SHA1_CONTEXT sha1_context;
  uint8 hash_stage1[SHA1_HASH_SIZE];
  uint8 hash_stage2[SHA1_HASH_SIZE];

  mysql_sha1_reset(&sha1_context);
  /* stage 1: hash password */
  mysql_sha1_input(&sha1_context, (uint8 *) password, (uint) strlen(password));
  mysql_sha1_result(&sha1_context, hash_stage1);
  /* stage 2: hash stage 1; note that hash_stage2 is stored in the database */
  mysql_sha1_reset(&sha1_context);
  mysql_sha1_input(&sha1_context, hash_stage1, SHA1_HASH_SIZE);
  mysql_sha1_result(&sha1_context, hash_stage2);
  /* create crypt string as sha1(message, hash_stage2) */;
  mysql_sha1_reset(&sha1_context);
  mysql_sha1_input(&sha1_context, (const uint8 *) message, SCRAMBLE_LENGTH);
  mysql_sha1_input(&sha1_context, hash_stage2, SHA1_HASH_SIZE);
  /* xor allows 'from' and 'to' overlap: lets take advantage of it */
  mysql_sha1_result(&sha1_context, (uint8 *) to);
  my_crypt(to, (const uchar *) to, hash_stage1, SCRAMBLE_LENGTH);
}

剩下的無非就是按格式解釋包中的各個欄位,然後再進行處理而已。典型程式碼段如下:

int handle_com_query(front_conn* front){
    char* sql = read_string(front->conn->read_buffer,front->conn->request_pool);
    int rs = server_parse_sql(sql);
    switch(rs & 0xff){
        case SHOW:
            return handle_show(front,sql,rs >> 8);
        case SELECT:
            return handle_select(front,sql,rs >> 8);    
        case KILL_QUERY:
				......
        default:
            return default_execute(front,sql,FALSE); 
    }
    return TRUE;
}


需要注意的是,hero在後端連線backend返回result_set結果集並拷貝到前端連線的write_buffer的時候,前端連線可能正在寫入,也會操縱write_buffer。所以在這種情況下要通過Mutex去保護write_buffer(packet_buffer)的內部資料結構。

效能對比

下面到了令人激動的效能對比環節,筆者在一個4核8G的機器上用hero和另一個用java nio寫的成熟DBProxy做對比。兩者都是用show databases,這條sql並不會路由到後端的資料庫,而是純記憶體返回。這樣筆者就能知道筆者自己造的reactor框架的效能如何,以下是對比情況:

用作對比的兩個server的程式碼IO模型
hero(c):epoll 水平觸發
proxy(java):java nio(內部也是epoll 水平觸發)

benchMark:
伺服器機器
4核8G
CPU主屏2399.996MHZ
cache size:4096KB

壓測機器:
16核64G,jmeter

同樣配置下,壓測同一個簡單的sql
hero(c):3.6Wtps/s
proxy(java):3.6Wtps/s
tps基本沒有差別,因為瓶頸是在網路上

CPU消耗:
hero(c):10% cpu
proxy(java):15% cpu

記憶體消耗:
hero(c):0.2% * 8G
proxy(java):48.3% * 8G 

結論:
對於IO瓶頸的情況,用java和C分別處理簡單的組幀/解幀邏輯,C語言帶來的微小收益並不能讓tps有顯著改善。

Hero雖然在CPU和記憶體消耗上有優勢,但是限於網路瓶頸,tps並沒有明顯提升-_-!
相比於造輪子時候付出的各種努力,投入產出比(至少在表面上)是遠遠不如用Netty這種非常成熟的框架的。如果是工作,那我會毫不猶豫的用後者。

推薦:中華文學網