1. 程式人生 > >Memcache原始碼閱讀(8)---多執行緒

Memcache原始碼閱讀(8)---多執行緒

我看的原始碼版本是1.2.4

前面已經將memcached的主要內容討論完了。這裡來說說memcached怎麼使用多執行緒的吧。memcached的執行緒分為兩種,一個主執行緒,另外的是工作執行緒。一個主執行緒負責接收使用者的請求,接收到請求後將這個請求遞交給多個工作執行緒的其中一個。每個工作執行緒都有一個連線佇列管理著主執行緒遞交的連線。

執行緒初始化

還是從執行緒的初始化開始。初始化一些鎖,然後初始化工作執行緒,在主執行緒和工作執行緒之間建立管道進行通訊。使用條件變數等待多個執行緒初始完成後函式返回。

void thread_init(int nthreads, struct event_base *main_base) {
    int
i; pthread_mutex_init(&cache_lock, NULL); pthread_mutex_init(&conn_lock, NULL); pthread_mutex_init(&slabs_lock, NULL); pthread_mutex_init(&stats_lock, NULL); pthread_mutex_init(&init_lock, NULL); pthread_cond_init(&init_cond, NULL); pthread_mutex_init(&cqi_freelist_lock, NULL); cqi_freelist = NULL; threads = malloc
(sizeof(LIBEVENT_THREAD) * nthreads); if (! threads) { perror("Can't allocate thread descriptors"); exit(1); } threads[0].base = main_base; threads[0].thread_id = pthread_self(); for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"
); exit(1); } threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; //設定執行緒的管道事件及其處理方式 setup_thread(&threads[i]); } /* Create threads after we've done all the libevent setup. */ for (i = 1; i < nthreads; i++) { //初始化工作執行緒,讓工作執行緒處於livevent監聽事件發生的狀態 create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); init_count++; /* main thread */ while (init_count < nthreads) { pthread_cond_wait(&init_cond, &init_lock); } pthread_mutex_unlock(&init_lock); } //為工作執行緒設定事件處理函式 static void setup_thread(LIBEVENT_THREAD *me) { if (! me->base) { me->base = event_init(); if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } } /* Listen for notifications from other threads */ //主執行緒接收到連線之後,會給該執行緒的pipe寫一個字,該工作執行緒就會觸發事件, //呼叫thread_libevent_process從工作佇列中取連線處理。 event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } cq_init(&me->new_conn_queue); } static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; if (read(fd, buf, 1) != 1) if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); item = cq_peek(&me->new_conn_queue); if (NULL != item) { //對這個連線建立事件處理函式,這個函式第三篇文章有討論過。 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->is_udp, me->base); if (c == NULL) { if (item->is_udp) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } cqi_free(item); } } static void create_worker(void *(*func)(void *), void *arg) { pthread_t thread; pthread_attr_t attr; int ret; pthread_attr_init(&attr); if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); exit(1); } } //讓工作執行緒處於livevent監聽事件發生的狀態 static void *worker_libevent(void *arg) { LIBEVENT_THREAD *me = arg; /* Any per-thread setup can happen here; thread_init() will block until * all threads have finished initializing. */ pthread_mutex_lock(&init_lock); init_count++; pthread_cond_signal(&init_cond); pthread_mutex_unlock(&init_lock); return (void*) event_base_loop(me->base, 0); }

多執行緒操作

這個版本的鎖粒度做得比較粗,很多訪問資料的操作都是使用cache_lock這個鎖,多執行緒操作知識在那個函式前後新增加鎖解鎖的操作。比如說更新item的操作。

//多執行緒版本
# define item_update(x)              mt_item_update(x)
void mt_item_update(item *item) {
    pthread_mutex_lock(&cache_lock);
    do_item_update(item);
    pthread_mutex_unlock(&cache_lock);
}

//單執行緒版本
# define item_update(x)              do_item_update(x)

到這裡memcached的原始碼就討論完了。

相關推薦

Memcache原始碼閱讀8---執行

我看的原始碼版本是1.2.4 前面已經將memcached的主要內容討論完了。這裡來說說memcached怎麼使用多執行緒的吧。memcached的執行緒分為兩種,一個主執行緒,另外的是工作執行緒。一個主執行緒負責接收使用者的請求,接收到請求後將這個請求遞交給

Memcache原始碼閱讀4---記憶體管理

我看的原始碼版本是1.2.4 memcached的儲存 memcached擁有一個記憶體池,記憶體池中的記憶體分成多種大小的chunk,chunk的大小有一個基礎大小(最小的chunk大小,base chunk size),然後後面大小的是以settings

Memcache原始碼閱讀1---看原始碼的心得

心得 我這是第一次看原始碼。說不上什麼心得,不過也總結一下~ 我覺得閱讀一個專案的原始碼,應該是先知道這個專案具體怎麼用之後,先估計一下作者的實現,然後再看原始碼來驗證自己的想法。 我這次閱讀原始碼是在沒有用過這個專案的前提下閱讀的,一開始的時候不知道從何

JAVA基礎複習執行和網路

1、建立執行緒和任務,如: //任務類必須實現Runnable介面 public class TaskClass implements Runnable{ ... public TaskClass(...){ ... } //想要在該執行緒執行的

Boost——執行

結合Boost官網 多執行緒的難點在於同步執行,需要“鎖”控制所有權。 鎖有分:互斥鎖,條件變數... 互斥鎖:boost::mutex 獲取和釋放成對存在,也可以用boost::lock_guard<boost::mutex> lock(mutex); boost::l

JAVA進階06執行

一、三個概念 1、程式 程式(Program)是一個靜態的概念,一般對應於作業系統中的一個可執行檔案 2、程序 (1)執行中的程式叫做程序(Process),是一個動態的概念 (2)特點: 程序是程式的一次動態執行過程, 佔用特定的地址空間 每個程序由3

Python高階程式設計執行

Python 多執行緒 多執行緒類似於同時執行多個不同程式,多執行緒執行有如下優點: 使用執行緒可以把佔據長時間的程式中的任務放到後臺去處理。 使用者介面可以更加吸引人,這樣比如使用者點選了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度 程式的執

執行基礎-執行併發安全問題

多執行緒基礎(三)-多執行緒併發安全問題 當多個執行緒併發操作同一資源時,由於執行緒切換實際不可控會導致操作邏輯執行順序出現混亂,嚴重時會導致系統癱瘓。例如下面的程式碼 public class SyncDemo { public static void main(Strin

Java併發程式設計執行四種實現方式

Java實現多執行緒的方式 Java實現多執行緒的方式有4種: 繼承Thread方法、實現Runnable介面、實現Callable介面並通過FutureTask建立執行緒、使用ExecutorService。 其中,前兩種執行緒執行結果沒有返回值,後兩種是有返回值的。 1、繼承Th

執行說學逗唱:關於執行那不得不說的二三事

(二)多執行緒說學逗唱:新手村偶遇Thread類 為什麼一上來就要寫這個 這個是啥,那個那個是啥,直接進去主題不好嗎?以前我也是這麼想的,可是後來呀…總之,一個不刨根問底的程式設計師不是好程式設計師,要深究一個知識點還就得知道他是從哪裡來,到哪裡去,既然來到這個事件

執行說學逗唱:村口的老R頭是個掃地僧Runnable

(一)多執行緒說學逗唱:關於執行緒那不得不說的二三事 (二)多執行緒說學逗唱:新手村偶遇Thread類 上一篇我們講到Thread這個類以及簡單地說了下執行緒執行的隨機性,相信大家對執行緒的使用有了不小的瞭解… 繼承Thread介面是實現多執行緒

執行說學逗唱:執行險惡,變數和執行安全不得不防

(一)多執行緒說學逗唱:關於執行緒那不得不說的二三事 (二)多執行緒說學逗唱:新手村偶遇Thread類 (三)多執行緒說學逗唱:村口的老R頭是個掃地僧(Runnable) 出了新手村,以後的路可就不那麼好走了,到底現在也是個江湖人,都必須經歷點困難挫折,要不以後拿什

Java Socket應用執行實現客戶端的通訊

伺服器執行緒處理類ServerThread.java : package com.yijia; import java.io.*; import java.net.Socket; /** * 建立時間:2018/10/4 14:59 * 作者: * 郵箱:[ema

PyQt5進階——執行:QTimer

應用程式開發中多執行緒的必要性: 一般情況下,應用程式都是單執行緒執行的,但是對GUI程式來說,單執行緒有時候滿足不了要求,但是對於一些特殊情況:比如一個耗時較長的操作,執行過程會有卡頓讓使用者以為程式出錯而把程式關閉或是系統本身認為程式執行出錯而自動關閉程式。這個時候就

OKHttp 3.10原始碼解析執行池和任務佇列

OKhttp是Android端最火熱的網路請求框架之一,它以高效的優點贏得了廣大開發者的喜愛,下面是OKhttp的主要特點: 1.支援HTTPS/HTTP2/WebSocket 2.內部維護執行緒池佇列,提高併發訪問的效率 3.內部維護連線池,支援多路複用,減少連線建立開銷 4.

PyQt5進階——執行:QThread & 事件處理

接上篇… 2. QThread 要使用QThread開始一個執行緒,可以建立它的一個子類,然後覆蓋其QThread.run()函式 class Thread(QThread): def __init__(self): super().__init__()

MFC筆記——執行程式設計1:模組、程序、執行間的基本概念

一、模組、程序、執行緒 1.1 模組         一段可執行的程式(包括EXE和DLL),其程式程式碼、資料、資源被載入到記憶體中,由系統建立一個數據結構來管理它。這段程式就是一個模組。這裡所說

python高階——執行2執行UDP聊天器

import socket import threading def recv_msg(udp_socket): # 接收資料 while True: recv_data = udp_socket.recvfrom(1024) print(recv

執行3-執行訪問共享物件和資料的方式

在多執行緒(2)-ThreadLocal,我們討論了執行緒範圍內的資料共享,本篇文章我們討論執行緒之間即多執行緒訪問共享物件和資料的方式 一:Java5之前給共享資料加上鎖synchronized,上程式碼 public class MultiThreadShareDat

非同步程式設計學習之路-執行之間的協作與通訊

本文是非同步程式設計學習之路(三)-多執行緒之間的協作與通訊,若要關注前文,請點選傳送門: 非同步程式設計學習之路(二)-通過Synchronize實現執行緒安全的多執行緒 通過前文,我們學習到如何實現同步的多執行緒,但是在很多情況下,僅僅同步是不夠的,還需要執行緒與執行緒協作(通訊),生產