Memcached原始碼分析之基於Libevent的網路模型(1)
文章列表:
《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》關於Memcached:
memcached是一款非常普及的伺服器端快取軟體,memcached主要是基於Libevent庫進行開發的。
如果你還不瞭解libevent相關知識,請先看我的libevent這篇文章《Linux c 開發 - libevent 》
memcached也是使用autotools的進行程式碼編譯管理的,如果你還不瞭解autotools,你可以先看下文章:《Linux c 開發 - Autotools使用詳細解讀 》
memcached你可以去官網上獲取原始碼。
Memcached分析
1. 網路模型流程分析
Memcached主要是基於Libevent的事件庫來實現網路執行緒模型的。我們先需要下載memcached的原始碼包,上面我們已經給出了原始碼包下載地址。
Memcached的網路執行緒模型主要涉及兩個主要檔案:memcached.c 和thread.c檔案。
我們這邊主要分析tcp的模型。memcached也支援udp。
流程
1. memcached首先在主執行緒中會建立main_base,memcached的主執行緒的主要工作就是監聽和接收listen和accpet新進入的連線。
2. 當memcached啟動的時候會初始化N個worker thread工作執行緒,每個工作執行緒都會有自己的LIBEVENT_THREAD
3. 當用戶有連線進來的時候,main thread主執行緒會通過求餘的方式選擇一個worker thread工作執行緒。
4. main thread會將當前使用者的連線資訊放入一個CQ_ITEM,並且將CQ_ITEM放入這個執行緒的conn_queue處理佇列,然後主執行緒會通過寫入pipe的方式來通知worker thread工作執行緒。
5. 當工作執行緒得到主執行緒main thread的通知後,就會去自己的conn_queue佇列中取得一條連線資訊進行處理,建立libevent的socket讀寫事件。
6. 工作執行緒會監聽使用者的socket,如果使用者有訊息傳遞過來,則會進行訊息解析和處理,返回相應的結果。
流程圖
資料結構:
1. CQ_ITEM:主要用於儲存使用者socket連線的基本資訊。
主執行緒會將使用者的socket連線資訊封裝成CQ_ITEM,並放入工作執行緒的處理佇列中。工作執行緒得到主執行緒的pipe通知後,就會將佇列中的ITEM取出來,建立libevent的socket讀事件。
/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd; //socket的fd
enum conn_states init_state; //事件型別
int event_flags; //libevent的flags
int read_buffer_size; //讀取的buffer的size
enum network_transport transport;
CQ_ITEM *next; //下一個item的地址
};
2. CQ:每個執行緒的處理佇列結構。
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};
3. LIBEVENT_THREAD:每個工作執行緒的資料結構。
每一個工作執行緒都有有這麼一個自己的資料結構,主要儲存執行緒資訊、處理佇列、pipe資訊等。
typedef struct {
//執行緒ID
pthread_t thread_id; /* unique ID of this thread */
//執行緒的 event_base,每個執行緒都有自己的event_base
struct event_base *base; /* libevent handle this thread uses */
//非同步event事件
struct event notify_event; /* listen event for notify pipe */
//管道接收端
int notify_receive_fd; /* receiving end of notify pipe */
//管道傳送端
int notify_send_fd; /* sending end of notify pipe */
//執行緒狀態
struct thread_stats stats; /* Stats generated by this thread */
//新連線佇列結構
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
2. main啟動入口
我們需要找到memcached.c中的main()方法。下面的程式碼中只列出了我們需要的重要部分。
int main (int argc, char **argv) {
//...省去一部分程式碼
/* initialize main thread libevent instance */
//初始化一個event_base
main_base = event_init();
/* initialize other stuff */
stats_init();
assoc_init(settings.hashpower_init);
conn_init();
slabs_init(settings.maxbytes, settings.factor, preallocate);
/*
* ignore SIGPIPE signals; we can use errno == EPIPE if we
* need that information
*/
if (sigignore(SIGPIPE) == -1) {
perror("failed to ignore SIGPIPE; sigaction");
exit(EX_OSERR);
}
/* start up worker threads if MT mode */
//這邊非常重要,這個方法主要用來建立工作執行緒,預設會建立4個工作執行緒
thread_init(settings.num_threads, main_base);
if (start_assoc_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
if (settings.slab_reassign &&
start_slab_maintenance_thread() == -1) {
exit(EXIT_FAILURE);
}
/* Run regardless of initializing it later */
init_lru_crawler();
/* initialise clock event */
clock_handler(0, 0, 0);
/* create unix mode sockets after dropping privileges */
if (settings.socketpath != NULL) {
errno = 0;
if (server_socket_unix(settings.socketpath,settings.access)) {
vperror("failed to listen on UNIX socket: %s", settings.socketpath);
exit(EX_OSERR);
}
}
/* create the listening socket, bind it, and init */
if (settings.socketpath == NULL) {
const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
char temp_portnumber_filename[PATH_MAX];
FILE *portnumber_file = NULL;
if (portnumber_filename != NULL) {
snprintf(temp_portnumber_filename,
sizeof(temp_portnumber_filename),
"%s.lck", portnumber_filename);
portnumber_file = fopen(temp_portnumber_filename, "a");
if (portnumber_file == NULL) {
fprintf(stderr, "Failed to open \"%s\": %s\n",
temp_portnumber_filename, strerror(errno));
}
}
errno = 0;
//這邊的server_sockets方法主要是socket的bind、listen、accept等操作
//主執行緒主要用於接收客戶端的socket連線,並且將連線交給工作執行緒接管。
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
}
/* enter the event loop */
//這邊開始進行主執行緒的事件迴圈
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
//...省去一部分程式碼
}
主執行緒中主要是通過thread_init方法去建立N個工作執行緒:
thread_init(settings.num_threads, main_base);
通過server_sockets方法去建立socket server:
errno = 0;
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
3. worker thread工作執行緒原始碼分析
我們在thread.c檔案中找到thread_init這個方法:
void thread_init(int nthreads, struct event_base *main_base) {
//...省了一部分程式碼
//這邊通過迴圈的方式建立nthreads個執行緒
//nthreads應該是可以設定的
for (i = 0; i < nthreads; i++) {
int fds[2];
//這邊會建立pipe,主要用於主執行緒和工作執行緒之間的通訊
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//threads是工作執行緒的基本結構:LIBEVENT_THREAD
//將pipe接收端和寫入端都放到工作執行緒的結構體中
threads[i].notify_receive_fd = fds[0]; //接收端
threads[i].notify_send_fd = fds[1]; //寫入端
//這個方法非常重要,主要是建立每個執行緒自己的libevent的event_base
setup_thread(&threads[i]);
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;
}
/* Create threads after we've done all the libevent setup. */
//這裡是迴圈建立執行緒
//執行緒建立的回撥函式是worker_libevent
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
setup_thread方法:
/*
* Set up a thread's information.
*/
static void setup_thread(LIBEVENT_THREAD *me) {
//建立一個event_base
//根據libevent的使用文件,我們可以知道一般情況下每個獨立的執行緒都應該有自己獨立的event_base
me->base = event_init();
if (! me->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}
/* Listen for notifications from other threads */
//這邊非常重要,這邊主要建立pipe的讀事件EV_READ的監聽
//當pipe中有寫入事件的時候,libevent就會回撥thread_libevent_process方法
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
//新增事件操作
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
//初始化一個工作佇列
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);
//初始化執行緒鎖
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
perror("Failed to initialize mutex");
exit(EXIT_FAILURE);
}
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
NULL, NULL);
if (me->suffix_cache == NULL) {
fprintf(stderr, "Failed to create suffix cache\n");
exit(EXIT_FAILURE);
}
}
create_worker方法:
/*
* Creates a worker thread.
*/
//這個方法是真正的建立工作執行緒
static void create_worker(void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
//這邊真正的建立執行緒
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
worker_libevent方法:
/*
* Worker thread: main event loop
*/
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
/* Any per-thread setup can happen here; thread_init() will block until
* all threads have finished initializing.
*/
/* set an indexable thread-specific memory item for the lock type.
* this could be unnecessary if we pass the conn *c struct through
* all item_lock calls...
*/
me->item_lock_type = ITEM_LOCK_GRANULAR;
pthread_setspecific(item_lock_type_key, &me->item_lock_type);
register_thread_initialized();
//這個方法主要是開啟事件的迴圈
//每個執行緒中都會有自己獨立的event_base和事件的迴圈機制
//memcache的每個工作執行緒都會獨立處理自己接管的連線
event_base_loop(me->base, 0);
return NULL;
}
thread_libevent_process方法:
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
//回撥函式中回去讀取pipe中的資訊
//主執行緒中如果有新的連線,會向其中一個執行緒的pipe中寫入1
//這邊讀取pipe中的資料,如果為1,則說明從pipe中獲取的資料是正確的
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0]) {
case 'c':
//從工作執行緒的佇列中獲取一個CQ_ITEM連線資訊
item = cq_pop(me->new_conn_queue);
//如果item不為空,則需要進行連線的接管
if (NULL != item) {
//conn_new這個方法非常重要,主要是建立socket的讀寫等監聽事件。
//init_state 為初始化的型別,主要在drive_machine中通過這個狀態類判斷處理型別
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
/* we were told to flip the lock type and report in */
case 'l':
me->item_lock_type = ITEM_LOCK_GRANULAR;
register_thread_initialized();
break;
case 'g':
me->item_lock_type = ITEM_LOCK_GLOBAL;
register_thread_initialized();
break;
}
}
conn_new方法(主要看兩行):
//我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
//主執行緒主要是監聽使用者的socket連線事件;工作執行緒主要監聽socket的讀寫事件
//當用戶socket的連線有資料傳遞過來的時候,就會呼叫event_handler這個回撥函式
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
//將事件新增到libevent的loop迴圈中
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
event_handler方法:
void event_handler(const int fd, const short which, void *arg) {
conn *c;
//組裝conn結構
c = (conn *)arg;
assert(c != NULL);
c->which = which;
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//最終轉交給了drive_machine這個方法
//memcache的大部分的網路事件都是由drive_machine這個方法來處理的
//drive_machine這個方法主要通過c->state這個事件的型別來處理不同型別的事件
drive_machine(c);
/* wait for next event */
return;
}
然後繼續看最重要的,也是核心的處理事件的方法drive_machine(狀態機),監聽socket連線、監聽socket的讀寫、斷開連線等操作都是在drive_machine這個方法中實現的。而這些操作都是通過c->state這個狀態來判斷不同的操作型別。
static void drive_machine(conn *c) {
//....................
assert(c != NULL);
while (!stop) {
//這邊通過state來處理不同型別的事件
switch(c->state) {
//這邊主要處理tcp連線,只有在主執行緒的下,才會執行listening監聽操作。
case conn_listening:
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
//......................
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
} else {
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
//連線等待
case conn_waiting:
//.........
break;
//讀取事件
//例如有使用者提交資料過來的時候,工作執行緒監聽到事件後,最終會呼叫這塊程式碼
case conn_read:
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;
case conn_parse_cmd :
if (try_read_command(c) == 0) {
/* wee need more data! */
conn_set_state(c, conn_waiting);
}
break;
case conn_new_cmd:
/* Only process nreqs at a time to avoid starving other
connections */
--nreqs;
if (nreqs >= 0) {
reset_cmd_handler(c);
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.conn_yields++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rbytes > 0) {
/* We have already read in data into the input buffer,
so libevent will most likely not signal read events
on the socket (unless more data is available. As a
hack we should just put in a request to write data,
because that should be possible ;-)
*/
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
}
stop = true;
}
break;
case conn_nread:
//....................
break;
case conn_swallow:
//....................
break;
case conn_write:
//.................
break;
//連線關閉
case conn_closing:
if (IS_UDP(c->transport))
conn_cleanup(c);
else
conn_close(c);
stop = true;
break;
case conn_closed:
/* This only happens if dormando is an idiot. */
abort();
break;
case conn_max_state:
assert(false);
break;
}
}
return;
}
4. main thread主執行緒原始碼分析
主執行緒的socket server主要通過server_sockets這個方法建立。而server_sockets中主要呼叫了server_socket這個方法,我們可以看下server_socket這個方法:
/**
* Create a socket and bind it to a specific port number
* @param interface the interface to bind to
* @param port the port number to bind to
* @param transport the transport protocol (TCP / UDP)
* @param portnumber_file A filepointer to write the port numbers to
* when they are successfully added to the list of ports we
* listen on.
*/
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
//建立一個新的事件
//我們發現上面的工作執行緒也是呼叫這個方法,但是區別是這個方法指定了state的型別為:conn_listening
//注意這邊有一個conn_listening,這個引數主要是指定呼叫drive_machine這個方法中的conn_listen程式碼塊。
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
conn_new方法:
//我們發現這個方法中又在建立event了,這邊實際上是監聽socket的讀寫等事件
//主執行緒主要是監聽使用者的socket連線事件;工作執行緒主要監聽socket的讀寫事件
//當用戶socket的連線有資料傳遞過來的時候,就會呼叫event_handler這個回撥函式
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
//將事件新增到libevent的loop迴圈中
if (event_add(&c->event, 0) == -1) {
perror("event_add");
return NULL;
}
然後我們跟蹤進入event_handler這個方法,並且進入drive_machine這個方法,我們上面說過server_socket這個方法中傳遞的state引數為預設寫死的conn_listening這個狀態,所以我們詳細看drive_machine中關於conn_listening這塊邏輯的程式碼。
case conn_listening:
addrlen = sizeof(addr);
#ifdef HAVE_ACCEPT4
//我們可以看到下面的程式碼是accept,接受客戶端的socket連線的程式碼
if (use_accept4) {
sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
} else {
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
}
#else
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
#endif
if (sfd == -1) {
if (use_accept4 && errno == ENOSYS) {
use_accept4 = 0;
continue;
}
perror(use_accept4 ? "accept4()" : "accept()");
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* these are transient, so don't log anything */
stop = true;
} else if (errno == EMFILE) {
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(false);
stop = true;
} else {
perror("accept()");
stop = true;
}
break;
}
if (!use_accept4) {
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
}
if (settings.maxconns_fast &&
stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
} else {
//如果客戶端用socket連線上來,則會呼叫這個分發邏輯的函式
//這個函式會將連線資訊分發到某一個工作執行緒中,然後工作執行緒接管具體的讀寫操作
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
dispatch_conn_new方法:
/*
* Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
//每個連線連上來的時候,都會申請一塊CQ_ITEM的記憶體塊,用於儲存連線的基本資訊
CQ_ITEM *item = cqi_new();
char buf[1];
//如果item建立失敗,則關閉連線
if (item == NULL) {
close(sfd);
/* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n");
return ;
}
//這個方法非常重要。主要是通過求餘數的方法來得到當前的連線需要哪個執行緒來接管
//而且last_thread會記錄每次最後一次使用的工作執行緒,每次記錄之後就可以讓工作執行緒進入一個輪詢,保證了每個工作執行緒處理的連線數的平衡
int tid = (last_thread + 1) % settings.num_threads;
//獲取執行緒的基本結構
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//向工作執行緒的佇列中放入CQ_ITEM
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
//向工作執行緒的pipe中寫入1
//工作執行緒監聽到pipe中有寫入資料,工作執行緒接收到通知後,就會向thread->new_conn_queue佇列中pop出一個item,然後進行連線的接管操作
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
相關推薦
Memcached原始碼分析之基於Libevent的網路模型(1)
文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 關於Memcached: memcached是一款非常普及的伺服器端快取軟體,memcached主要是基於Libevent庫進行開發的。 如果你還不瞭解libev
Memcached原始碼分析之增刪改查操作(5)
文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_comman
Memcached原始碼分析-網路模型(1)
1 網路模型 Memcached採用了,單程序多執行緒的工作方式,同時採用了libevent事件驅動進行網路請求的處理。 2 工作原理 2.1 libevent介紹 2.2 網路請求流程 2.2.1 流程圖 2.2.2 主執行緒工作流程分析 主執行緒工作流
muduo原始碼分析之實現TCP網路庫(連線的接收和關閉)
在EventLoop、Channel、Poller三個類中完成對一般描述符、事件迴圈(poll)的封裝。實現了Reactor的基本功能,接下來則需要將網路套接字描述符、I/O函式、等進行封裝。 1.傳統的TcpServer 在進行封裝之前需要明確我們需要
Spark core原始碼分析之spark叢集的啟動(二)
2.2 Worker的啟動 org.apache.spark.deploy.worker 1 從Worker的伴生物件的main方法進入 在main方法中首先是得到一個SparkConf例項conf,然後將conf和啟動Worker傳入的引數封裝得到Wor
spark mllib原始碼分析之隨機森林(Random Forest)(二)
4. 特徵處理 這部分主要在DecisionTree.scala的findSplitsBins函式,將所有特徵封裝成Split,然後裝箱Bin。首先對split和bin的結構進行說明 4.1. 資料結構 4.1.1. Split cl
spark mllib原始碼分析之隨機森林(Random Forest)(三)
6. 隨機森林訓練 6.1. 資料結構 6.1.1. Node 樹中的每個節點是一個Node結構 class Node @Since("1.2.0") ( @Since("1.0.0") val id: Int, @S
android原始碼分析之View的事件分發(上)
1、View的繼承關係圖 View的繼承關係圖如下: 其中最重要的子類為ViewGroup,View是所有UI元件的基類,而ViewGroup是容納這些元件的容器,同時它也是繼承於View類。而UI元件的繼承關係如上圖,比較常用的元件類用紅色字型標出
原始碼分析之基於LinkedList手寫HahMap(二)
package com.mayikt.extLinkedListHashMap; import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; /** * 基於linkedList實現hashMap *
原始碼分析之基於ArrayList手寫HahMap(一)
import java.util.ArrayList; import java.util.List; /** * 基於arraylist實現hashmap集合(簡版:效率低) * @author zjmiec * */ public class ExtArrayListHashMap&
jvm原始碼分析之oop-klass物件模型
概述 HotSpot是基於c++實現,而c++是一門面向物件的語言,本身具備面向物件基本特徵,所以Java中的物件表示,最簡單的做法是為每個Java類生成一個c++類與之對應。 但HotSpot JVM並沒有這麼做,而是設計了一個OOP-Klass Model。這裡的 O
Netty原始碼分析之Reactor執行緒模型
一、背景 最近在研究netty的原始碼,今天發表一篇關於netty的執行緒框架--Reactor執行緒模型,作為最近研究成果。如果有還不瞭解Reactor模型請自行百度,網上有很多關於Reactor模式。 研究netty的時候,先看了下《netty權威指南》,裡面講解不
Memcached原始碼分析之Hash表操作
Memcached的Hash表用來提高資料訪問效能,通過連結法來解決Hash衝突,當Hash表中資料多餘Hash表容量的1.5倍時,Hash表就會擴容,Memcached的Hash表操作沒什麼特別的,我們這裡簡單介紹下Memcached裡面的Hash表操作。 //hash表
Memcached原始碼分析之記憶體管理篇之item結構圖及slab結構圖
.Memcached原始碼分析之記憶體管理篇 部落格分類: linuxc . 使用命令 set(key, value) 向 memcached 插入一條資料, memcached 內部是如何組織資料呢 一 把資料組裝成 item memcached 接受到客戶端的資料後
memcached原始碼分析之四
還是從Memcached.c檔案的main函式開始,逐步分析Memcached的實現 if (!sanitycheck()) { return EX_OSERR; }static bool sanitycheck(void) { /*
Memcached原始碼分析之訊息迴應(3)
文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 上一章《Memcached原始碼分析 - Memcached原始碼分析之命令解析(2)》,我們花了很大的力氣去講解Memcached如何從客戶端讀取命令,並且
Memcached原始碼分析之儲存機制Slabs(7)
文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》前言 這一章我們重點講解Memcached的儲存機制Slabs。Memcached儲存Item的程式碼都是在slabs.c中來實現的。 在解讀這一章前,我們必須先了
從壹開始微服務 [ DDD ] 之十一 ║ 基於原始碼分析,命令分發的過程(二)
緣起 哈嘍小夥伴週三好,老張又來啦,DDD領域驅動設計的第二個D也快說完了,下一個系列我也在考慮之中,是 Id4 還是 Dockers 還沒有想好,甚至昨天我還想,下一步是不是可以寫一個簡單的Angular 入門教程,本來是想來個前後端分離的教學視訊的,簡單試了試,發現自己的聲音不好聽,真心不好聽那種,就作
netty原始碼分析之-EventLoop與執行緒模型(1)
執行緒模型確定來程式碼的執行方式,我們總是必須規避併發執行可能會帶來的副作用,所以理解netty所採用的併發模型的影響很重要。netty使用了被稱為事件迴圈的EventLoop來執行任務來處理在連線的生命週期內發生的事件 執行緒模型 對於Even
Mybatis深入原始碼分析之基於裝飾模式純手寫一級,二級,三級快取
寫在前面:設計模式源於生活,而又高於生活! 什麼是裝飾者模式 在不改變原有物件的基礎上附加功能,相比生成子類更靈活。