Redis的事件模型實現基於linux的epoll,sun的export,FreeBSD和Mac osx的queue,還有select;我們簡單分析下Redis基於epoll實現的事件模型。main函式呼叫initServer實現服務初始化:
void initServer(void) { int j; //SIG_DFL:預設訊號處理程式 //SIG_IGN:忽略訊號的處理程式 signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); //設定訊號處理函式 setupSignalHandlers(); if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); } = getpid(); server.current_client = NULL; //客戶端連結串列 server.clients = listCreate(); //非同步關閉的客戶端 server.clients_to_close = listCreate(); //從機 server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; // server.system_memory_size = zmalloc_get_memory_size(); createSharedObjects(); // adjustOpenFilesLimit(); //構建aeEventLoop物件 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ if (server.port != 0 && //啟動TCP監聽服務 --> 並設定為非阻塞 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); /* Open the listening Unix domain socket. */ //啟動本地監聽服務 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); } /* Abort if there are no listening sockets at all. */ //如果沒有監聽套接字,則退出 --->初始化預設為0 -- > 在listenToPort每成功建立一個監聽套接字,自加1 if (server.ipfd_count == 0 && server.sofd < 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } /* Create the Redis databases, and initialize other internal state. */ for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].eviction_pool = evictionPoolAlloc(); server.db[j].id = j; server.db[j].avg_ttl = 0; } server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; server.rdb_child_type = RDB_CHILD_TYPE_NONE; server.rdb_bgsave_scheduled = 0; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.resident_set_size = 0; server.lastbgsave_status = C_OK; server.aof_last_write_status = C_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; updateCachedTime(); /* Create the serverCron() time event, that's our main way to process * background operations. */ if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ //建立epoll事件監聽 --> 一個tcp套接字一個監聽檔案 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } //建立epoll事件監聽 --> 一個本地套接字一個監聽檔案 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { serverLog(LL_WARNING, "Can't open the append-only file: %s", strerror(errno)); exit(1); } } /* 32 bit instances are limited to 4GB of address space, so if there is * no explicit limit in the user provided configuration we set a limit * at 3 GB using maxmemory with 'noeviction' policy'. This avoids * useless crashes of the Redis instance for out of memory. */ if (server.arch_bits == 32 && server.maxmemory == 0) { serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = MAXMEMORY_NO_EVICTION; } if (server.cluster_enabled) clusterInit(); replicationScriptCacheInit(); scriptingInit(1); slowlogInit(); latencyMonitorInit(); bioInit(); }
1 構建aeEventLoop物件
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
2 建立套接字監聽,分為TCP監聽和本地套接字監聽
/* Open the TCP listening socket for the user commands. */ if (server.port != 0 && //啟動TCP監聽服務 --> 並設定為非阻塞 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); /* Open the listening Unix domain socket. */ //啟動本地監聽服務 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); }
3 將建立監聽的套接字加入EventLoop事件監聽
//建立epoll事件監聽 --> 一個tcp套接字一個監聽檔案 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } //建立epoll事件監聽 --> 一個本地套接字一個監聽檔案 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; //監聽的最大檔案號
int setsize; //可以註冊的事件的上限,預設為1024*10
long long timeEventNextId; //定時器事件的ID編號管理(分配ID號所用)
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; //註冊的檔案事件,這些是需要程序關注的檔案
aeFiredEvent *fired; //poll結果,待處理的檔案事件的檔案號和事件型別
aeTimeEvent *timeEventHead; //定時器時間連結串列
int stop; //時間輪詢是否結束?
void *apidata; //檔案事件的輪詢資料和結果資料:poll; 三種輪詢方式:epoll(linux),select(windows),kqueue
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
//底層epoll多路複用初始化,然後存放在aeEventLoop中 void * 型別的apidata,隱藏了底層的實現。
* 函 數 名 : aeCreateEventLoop
* 函式功能 : 構建aeEventLoop物件eventLoop
* 輸入引數 : int setsize 訊息佇列大小
* 輸出引數 : 無
* 返 回 值 : aeEventLoop
* 呼叫關係 :
* 記 錄
* 1.日 期: 2017年10月21日
* 作 者: zyz
* 修改內容: 新生成函式
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
//eventLoop->events和event_loop->fired都是一個set size大小的
//陣列,表明了event loop是用陣列下標來當fd。
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0; //指示事件迴圈是否繼續進行。
eventLoop->maxfd = -1; //事件連結串列中目前最大的fd,用於減小檢索範圍
eventLoop->beforesleep = NULL; //每輪迴圈前的回撥函式
// aeApiCreate封裝了不同多路複用的實現,如linux的epoll,sun的export,FreeBSD和Mac osx的queue,還有select。
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;// AE_NONE來指示該fd沒有放入事件迴圈庫
return eventLoop;
if (eventLoop) {
return NULL;
很簡單,分配一個aeEventLoop 物件,然後一折物件為引數,呼叫底層實現函式aeApiCreate,我們看下epoll型別的實現函式:
* 函 數 名 : aeApiCreate
* 函式功能 : 封裝epoll事件驅動構建函式
* 輸入引數 : aeEventLoop *eventLoop 事件迴圈描述結構物件
* 輸出引數 : 無
* 返 回 值 : static
* 呼叫關係 :
* 記 錄
* 1.日 期: 2017年10月23日
* 作 者: zyz
* 修改內容: 新生成函式
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
return -1;
//該函式生成一個epoll專用的檔案描述符。它其實是在核心申請一空間,用來存放你想關注的socket fd上是否發生以及發生了什麼事件
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
return -1;
eventLoop->apidata = state;
return 0;
非常簡單粗暴,構建一個aeApiState物件,這個物件包含一個檔案描述符和一個epoll_event 物件指標。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
* 函 數 名 : aeCreateFileEvent
* 函式功能 : 建立事件監聽物件
* 輸入引數 : aeEventLoop *eventLoop 事件監聽服務描述符物件指標
int fd 被監聽檔案描述符
int mask 監聽事件掩碼
aeFileProc *proc 事件響應函式
void *clientData 引數
* 輸出引數 : 無
* 返 回 值 :
* 呼叫關係 :
* 記 錄
* 1.日 期: 2018年03月06日
* 作 者: zyz
* 修改內容: 新生成函式
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
/* File event structure */
/* 檔案事件結構體 */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
在呼叫aeCreateEventLoop函式時,分配了指定大小的記憶體,用於在抽象層代表可以監聽指定數量的檔案描述符。在函式中就根據指定檔案描述符取出指定的aeFileEvent 物件,然後以該物件為引數,呼叫底層實現函式aeApiAddEvent:
//EPOLL_CTL_ADD: 註冊新的fd到epfd中;
//EPOLL_CTL_MOD: 修改已經註冊的fd的監聽事件;
//EPOLL_CTL_DEL: 從epfd中刪除一個fd;
* 函 數 名 : aeApiAddEvent
* 函式功能 : epoll事件註冊函式封裝
* 輸入引數 : aeEventLoop *eventLoop 事件驅動模型結構物件
int fd 被監聽事件的檔案描述符
int mask 事件掩碼
* 輸出引數 : 無
* 返 回 值 : static
* 呼叫關係 :
* 記 錄
* 1.日 期: 2017年10月23日
* 作 者: zyz
* 修改內容: 新生成函式
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
//EPOLLIN: 觸發該事件,表示對應的檔案描述符上有可讀資料。(包括對端SOCKET正常關閉);
//EPOLLOUT: 觸發該事件,表示對應的檔案描述符上可以寫資料;
//EPOLLPRI: 表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來);
//EPOLLERR: 表示對應的檔案描述符發生錯誤;
//EPOLLHUP: 表示對應的檔案描述符被結束通話;
//EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。
//EPOLLONESHOT: 只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡。 = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) |= EPOLLIN;
if (mask & AE_WRITABLE) |= EPOLLOUT; = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
* 函 數 名 : aeMain
* 函式功能 : 事件迴圈處理
* 輸入引數 : aeEventLoop *eventLoop 事件監聽服務描述符物件指標
* 輸出引數 : 無
* 返 回 值 :
* 呼叫關係 :
* 記 錄
* 1.日 期: 2018年03月06日
* 作 者:
* 修改內容: 新生成函式
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) { //以死迴圈的方式進入輪詢,直到stop被置為非0
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop); //前期處理,回撥函式
aeProcessEvents(eventLoop, AE_ALL_EVENTS); //事件處理(見下面)
Redis的事件模型實現基於linux的epoll,sun的export,FreeBSD和Mac osx的queue,還有select;我們簡單分析下Redis基於epoll實現的事件模型。main函式呼叫initServer實現服務初始化:void initServer(v
Redis epoll事件模型
Redis的事件模型在這裡我們用ae_epoll.c,epoll詳細工作原理 一、redis對原始的epoll資料結構進行了封裝 二、
Redis原始碼閱讀(六)叢集-故障遷移(下) 最近私人的事情比較多,沒有抽出時間來整理部落格。書接上文,上一篇裡總結了Redis故障遷移的幾個關鍵點,以及Redis中故障檢測的實現。本篇主要介紹叢集檢測到某主節點下線後,是如何選舉新的主節點的。注意到Redis叢集是無中心的,那麼使用分散式一
叢集搭建好之後,使用者傳送的命令請求可以被分配到不同的節點去處理。那Redis對命令請求分配的依據是什麼?如果節點數量有變動,命令又是如何重新分配的,重分配的過程是否會阻塞對外提供的服務?接下來會從這兩個問題入手,分析Redis3.0的原始碼實現。 1. 分配依據—
一. 資料庫 Redis的資料庫使用字典作為底層實現,資料庫的增、刪、查、改都是構建在字典的操作之上的。 redis伺服器將所有資料庫都儲存在伺服器狀態結構redisServer(redis.h/redisServer)的db陣列(應該是一個連結串列)裡:
Redis原始碼閱讀之: 環境搭建及準備
1.下載原始碼 2.IDE配置(Clion on windows) ps:Clion特別適合看C程式碼, 而且跨平臺 直接進入clion開啟redis原始碼的資料夾 沒mingw則安裝下m
Redis系統當中,針對字串進行的更加完善的封裝,建立了一個動態字串,並構建了大量的實用api。相關的實現程式碼為sds.h及sds.c,以下為我的原始碼閱讀筆記。內容較多,逐步更新typedefchar *sds; struct __attribute__ ((__pac
dict.h 在redis中,dict.h主要是hash的底層實現方式。 在dict.h中主要是一些資料結構的定義,以及一些巨集函式的定義相關的內容。 雜湊節點定義 原始碼中的dictEntry就是雜湊節點的相關定義 typedef struct dictEnt
adlist.c adlist.c檔案中主要是一些雙向連結串列的操作的實現。 listCreate函式 //listCreate函式類似於一個list的建構函式 //直接通過zmalloc函式申請list大小的空間,並且將pre,next指標都置為NULL //
前言 本小節主要是整理一下前幾節分析的nginx的核心模組的ngx_events_module以及事件模組,關於事件模組什麼時候初始化以及事件的處理等,因此不會涉及到太多具體的程式碼,主要是把握事件模組的整體。 配置項結構體的建立及賦值 在使
受限 urn let event flag 選擇 idt block 1-1 一、EPOLL的優點 在Linux中,select/poll/epoll是I/O多路復用的三種方式,epoll是Linux系統上獨有的高效率I/O多路復用方式,區別於select/poll。先
redis 文件事件模型
desc edi org sockaddr sel 事件處理 CI sizeof logs 參考文獻: 深入剖析 redis 事件驅動 Redis 中的事件循環 深入了解epoll (轉) Redis自己的事件模型 ae EPOLL(7) Linux IO模式及
導致 分組 數據 png 規則 blog bloom 負責 提高 概要 硬件實現 基於sketch 功能:采集包數、流長數據,恢復五元組 重點:高速條件下性能較好,節省硬件資源 摘要: 提出一種基於sketch 數據結構的軟件定義測量數據平面硬件模型,並在以現場可編程邏輯
【原始碼】基於IEEE 14匯流排標準的複合微電網SIMULINK模型
本程式設計了一種基於IEEE 14匯流排標準的複合微電網模型,該微電網模型包括柴油發電機、PV模型、電池儲能系統、電弧爐等非線性負載。微電網採用併網執行方式。 本模型的參考文獻: A new approach for soft synchronization of microgri
setTypeCreate:返回一個集合物件 robj *setTypeCreate(robj *value) { if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK) return creat
listTypeTryConversion:嘗試將列表物件轉換成linkedlist編碼 void listTypeTryConversion(robj *subject, robj *value) { // 確保 subject 為 ZIPLIST 編碼 if (su
hashTypeTryConversion:對傳入的引數進行檢查是否需要從ziplist轉換成hashtable void hashTypeTryConversion(robj *o, robj **argv, int start, int end) { int i;
lookupKey:查詢指定的鍵,如果存在返回對應的值 robj *lookupKey(redisDb *db, robj *key) { // 查詢鍵空間 dictEntry *de = dictFind(db->dict,key->ptr);
基於電路的光伏電池模擬模型,用於估計光伏面板的IV特性曲線相對於環境引數(溫度和輻照)和電池引數(寄生電阻和理想因子)的變化。 A circuit based simulation model for a PV cell for estimating the IV characteri
Hadoop原始碼是這麼做,其他原始碼環境也類似。 1、到官網下載Hadoop原始碼包,例如hadoop-2.6.5-src.tar.gz. 2、將下載的原始碼包解壓到某