1. 程式人生 > >Solaris10 下的多執行緒和Mysql多執行緒連線

Solaris10 下的多執行緒和Mysql多執行緒連線

1. Solairs下的多執行緒

執行緒分成兩種,一種是POSIX格式的(使用 pthread.h),一種是Solairs格式的(thread.h),建議使用 POSIX格式。

    int   pthread_create(pthread_t *restrict thread, 
                         const pthread_attr_t *restrict attr, 
                         void *(*start_routine)(void*), 
                         void *restrict arg);

用於建立執行緒

int pthread_detach(pthread_t thread);

用於設定執行緒線上程主程式終止以後,自動停止並釋放相關資源。

2. 執行緒執行引數的範圍問題

int thread_arg;
int main( int argc, char **argv){
pthread_t thread;
    thread_arg = 1;
    result = pthread_create (&thread, 
                             (const pthread_attr_t *) NULL, 
                             sock_thread_fun,          /** 執行緒主程式 */
                             (void *) &thread_arg
); /** 傳送進入的主程式的引數,需要傳送全域性變數的指標 */ if (result != 0){ fprintf (stderr, "Pthread_create failed (no = %d).", result); else pthread_detach (thread); } static void * sock_thread_fun (void * arg) { int conn_fd; conn_fd = * ((int *) arg); DO_SOMTHING...... return (void *) NULL; }


 上面是一個簡單的執行緒建立示例。

這裡需要注意:傳送給執行緒的引數,他不像呼叫函式傳送引數,可以使用上一級函式內宣告引數。執行緒建立傳送的執行引數,一定要使用全域性引數。

 3. 多執行緒執行引數傳送

上面的程式在傳送引數的時候,若只建立一個執行緒沒有問題,若迴圈建立多個執行緒,就會出現問題。

執行緒的建立過程實際上是非同步的過程,若多個執行緒要讀取同一個需要變化的全域性變數,就可能出現執行緒讀取的資料是已經被變更的資料。

int thread_arg;
int main( int argc, char **argv){
pthread_t thread;
int i;
    for (i = 0; i < 10; i++) {
        thread_arg = i;
        result = pthread_create (&thread, 
                                 (const pthread_attr_t *) NULL, 
                                 sock_thread_fun,          /** 執行緒主程式 */
                                 void *) &thread_arg);    /** 傳送進入的主程式的引數,需要傳送全域性變數的指標 */
        if (result != 0){
            fprintf (stderr, "Pthread_create failed (no = %d).", result);
        else
            pthread_detach (thread);
    }
}

static void * sock_thread_fun (void * arg) {
int conn_fd;
    conn_fd = * ((int *) arg);

    DO_SOMTHING......

    return (void *) NULL;
}


按照上面的例子,就可能出現,conn_fd 被賦值錯誤的問題。

例如:第三次呼叫執行緒,可能 conn_fd 獲得的數值是 1,而不是應該的 2。

利用POSIX執行緒中的 KEY 資料可以讓每個執行緒獲取獨立的資料區域,但是操作起來過於繁瑣,可以簡單的利用雙向連結串列和執行緒互斥鎖的功能完成這個工作。

typedef struct np_thread_arg {
    struct np_thread_arg * prev;
    struct np_thread_arg * next;
    int conn_fd;
} NP_THREAD_ARG;

NP_THREAD_ARG first_thread_arg;
pthread_mutex_t mutex_arg;

int main( int argc, char **argv){
pthread_t thread;
int i;
    pthread_mutex_init (&mutex_arg, NULL);
    for (i = 0; i < 10; i++) {
        add_thread_arg (i);
        result = pthread_create (&thread, 
                                 (const pthread_attr_t *) NULL, 
                                 sock_thread_fun,          /** 執行緒主程式 */
                                 void *) &thread_arg);    /** 傳送進入的主程式的引數,需要傳送全域性變數的指標 */
        if (result != 0){
            fprintf (stderr, "Pthread_create failed (no = %d).", result);
        else
            pthread_detach (thread);
    }
}

static void * sock_thread_fun (void * arg) {
int conn_fd;
    conn_fd = * ((int *) arg);

    DO_SOMTHING......

    remove_thread_arg ((NP_THREAD_ARG *) arg);
    return (void *) NULL;
}

/** 新增一個執行緒的資料集 */
void add_thread_arg (int conn_fd) {
NP_THREAD_ARG * tmp_node;
    tmp_node = (NP_THREAD_ARG *) malloc (sizeof (NP_THREAD_ARG));
    tmp_node->conn_fd = conn_fd;

    pthread_mutex_lock (&mutex_arg);    /** 新增互斥鎖 */
    if (!first_thread_arg) {
        first_thread_arg = tmp_node;
        first_thread_arg->next = tmp_node;
        first_thread_arg->prev = tmp_node;
    }
    else {
        tmp_node->next = first_thread_arg;
        tmp_node->prev = first_thread_arg->prev;
        tmp_node->next->prev = tmp_node;
        tmp_node->prev->next = tmp_node;
    }
    pthread_mutex_unlock (&mutex_arg);  /** 解除互斥鎖 */
    return;
}

/** 移除一個執行緒的資料集 */
void remove_thread_arg (NP_THREAD_ARG * del_node) {
NP_THREAD_ARG * curNode;
    pthread_mutex_lock (&mutex_arg);    /** 新增互斥鎖 */
    curNode = del_node->next;
    curNode->prev = del_node->prev;
    curNode->prev->next = curNode;
    if (curNode == del_node) {          /** 說明連結串列內只有一個節點 */
        free (del_node);
        curNode = NULL;
    }
    else {
        free (del_node);
    }
    first_thread_arg = curNode;
    pthread_mutex_unlock (&mutex_arg);  /** 解除互斥鎖 */
    return;
}


按照上面的範例,我們通過一個簡單的結構,可以完成在不同執行緒之間獨立的傳送資料進入。

若希望執行緒執行中有一些資料返回,可以使用 pthread_join () 把當前執行緒阻塞,然後將資料交給其他執行緒處理。

4.  Mysql多執行緒通訊問題

我們的專案需要每個執行緒操作資料庫。

有兩個選擇,程序採用一個獨立的MYSQL連結,處理所有相關操作;每個執行緒採用單獨的MYSQL連結。

考慮到資料庫的壓力和監控的方便性,我們選擇了所有執行緒使用一個程序生成的MYSQL連結的方式。

這樣,就在MYSQL中引起了執行緒安全的問題,若沒有任何處理,就可能在你的程式執行過程中,資料庫連線被中斷掉。

Mysql error: Lost connection to MySQL server during query

這樣的錯誤資訊引起,是因為多個執行緒對於同一個MYSQL連線進行操作而造成的。

這裡需要注意的2點內容:

1. 需要重新編譯Mysql的客戶端庫,並且替換,不需要處理伺服器端的庫;

2. 編譯程式需要使用 “-lmysqlclient_r” 的連線命令,替換常用的 “-lmysqlclient” 引數

5. 如何檢視當前系統是否支援多執行緒安全?

一般在編譯MYSQL的時候,我們都不會新增多執行緒安全選項。你的系統是否有,可以通過如下兩點來判斷。

1. 到原始碼編譯的位置,開啟頭10行的 “config.status”檔案,就可以看到自己的 configure 命令引數當初設定的內容,若存在 ”--enable-thread-safe-client“引數,說明有這個功能;

2. 到LIB庫的位置檢視相關連結庫的內容;

libmysqlclient_r.so -> libmysqlclient_r.so.10.0.0
libmysqlclient_r.so.10.0.0
libmysqlclient.so -> libmysqlclient.so.10.0.0
libmysqlclient.so.10 -> libmysqlclient.so.10.0.0
libmysqlclient.so.10.0.0

若看到上面兩行紅色的,就說明是支援多執行緒客戶端安全的,這裡需要注意,若看到如下內容,說明是不支援的。

libmysqlclient_r.so -> libmysqlclient.so.10.0.0
libmysqlclient.so -> libmysqlclient.so.10.0.0
libmysqlclient.so.10 -> libmysqlclient.so.10.0.0
libmysqlclient.so.10.0.0

上面個兩個部分的區別,請自己仔細比較

6. 使用MYSQL多執行緒安全

使用多執行緒安全中,主要是建立資料庫連線,和斷開資料庫連線的位置需要改動。程式碼新增兩個簡單的小函式解決這個問題。

typedef struct np_thread_arg {
    struct np_thread_arg * prev;
    struct np_thread_arg * next;
    int conn_fd;
} NP_THREAD_ARG;

NP_THREAD_ARG first_thread_arg;
pthread_mutex_t mutex_arg;
MYSQL * db_handle;

int main( int argc, char **argv){
pthread_t thread;
int i;
    pthread_mutex_init (&mutex_arg, NULL);
    db_handle = db_init (&init_info);
    for (i = 0; i < 10; i++) {
        add_thread_arg (i);
        result = pthread_create (&thread, 
                                 (const pthread_attr_t *) NULL, 
                                 sock_thread_fun,          /** 執行緒主程式 */
                                 void *) &thread_arg);    /** 傳送進入的主程式的引數,需要傳送全域性變數的指標 */
        if (result != 0){
            fprintf (stderr, "Pthread_create failed (no = %d).", result);
        else
            pthread_detach (thread);
    }
    db_close (db_handle);
}

static void * sock_thread_fun (void * arg) {
int conn_fd;
    conn_fd = * ((int *) arg);

    DO_SOMTHING......

    remove_thread_arg ((NP_THREAD_ARG *) arg);
    return (void *) NULL;
}

/** 新增一個執行緒的資料集 */
void add_thread_arg (int conn_fd) {
NP_THREAD_ARG * tmp_node;
    tmp_node = (NP_THREAD_ARG *) malloc (sizeof (NP_THREAD_ARG));
    tmp_node->conn_fd = conn_fd;

    pthread_mutex_lock (&mutex_arg);    /** 新增互斥鎖 */
    if (!first_thread_arg) {
        first_thread_arg = tmp_node;
        first_thread_arg->next = tmp_node;
        first_thread_arg->prev = tmp_node;
    }
    else {
        tmp_node->next = first_thread_arg;
        tmp_node->prev = first_thread_arg->prev;
        tmp_node->next->prev = tmp_node;
        tmp_node->prev->next = tmp_node;
    }
    pthread_mutex_unlock (&mutex_arg);  /** 解除互斥鎖 */
    return;
}

/** 移除一個執行緒的資料集 */
void remove_thread_arg (NP_THREAD_ARG * del_node) {
NP_THREAD_ARG * curNode;
    pthread_mutex_lock (&mutex_arg);    /** 新增互斥鎖 */
    curNode = del_node->next;
    curNode->prev = del_node->prev;
    curNode->prev->next = curNode;
    if (curNode == del_node) {          /** 說明連結串列內只有一個節點 */
        free (del_node);
        curNode = NULL;
    }
    else {
        free (del_node);
    }
    first_thread_arg = curNode;
    pthread_mutex_unlock (&mutex_arg);  /** 解除互斥鎖 */
    return;
}

/** 建立資料庫連線 */
MYSQL * db_init (NP_DB_INITINFO * init_info) {
MYSQL * db_handle;
    my_init ();         /** 為執行緒安全,在連結以前呼叫一次 */
    mysql_init (NULL);
    db_handle = mysql_init (NULL);
    mysql_thread_init ();       /** 執行緒安全初始化 */
    if (mysql_real_connect (db_handle, init_info->hostname, init_info->username, init_info->userpass, init_info->dbname, init_info->port, NULL, 0) == NULL) {
        db_close (db_handle);
        return NULL;
    }
    fprintf (stdout, "DB connectiong created (id = %u).", db_handle->thread_id);
    return db_handle;
}

/** 關閉資料庫連線 */
void db_close (MYSQL * db_handle) {
    if(db_handle != NULL) {
    unsigned long db_thread_id = db_handle->thread_id;
        mysql_close (db_handle);
        mysql_thread_end();
        fprintf (stdout, "DB connection closed (id = %u).", db_thread_id);
    }
    return;
}

和一般的MYSQL呼叫之間,區別就在於紅色的4行內容。有他們就可以正式使用客戶端多執行緒安全庫了。


 7. 資料庫操作需要注意的內容

在MYSQL官方文件中,特別提到了如下內容:

”如果使用了mysql_use_result,務必確保無其他執行緒正在使用相同的連線,直至關閉了結果集為止。然而,對於執行緒式客戶端,最好是共享相同的連線以使用mysql_store_result()。“

這個內容非常重要,查詢您自己的程式碼,替換 mysql_use_result 函式的使用。

”如果打算在相同的連線上使用多個執行緒,必須在mysql_query()mysql_store_result()呼叫組合上擁有互斥鎖。一旦mysql_store_result()準備就緒,可釋放鎖定,其他執行緒可在相同的連線上執行查詢。

一般其他操作不會去查詢結果,所以主要針對 SELECT 語句,可以封裝一個自己的小函式解決這個問題。

MYSQL_RES * np_select (MYSQL * db_handle, const char * sql, pthread_mutex_t * mutex) {
MYSQL_RES * res_set;

    if (mutex) {        /** 建立了互斥鎖 */
        pthread_mutex_lock(mutex);
    }
    if (mysql_query(db_handle, sql)) {
        return NULL;
    }
    res_set = mysql_store_result (db_handle);
    if (mutex) {        /** 解除互斥鎖 */
        pthread_mutex_unlock(mutex);
    }
    return res_set;
}


 參考資料: