1. 程式人生 > >Linux 執行緒同步的三種方法

Linux 執行緒同步的三種方法


執行緒的最大特點是資源的共享性,但資源共享中的同步問題是多執行緒程式設計的難點。linux下提供了多種方式來處理執行緒同步,最常用的是互斥鎖、條件變數和訊號量。

一、互斥鎖(mutex)

通過鎖機制實現執行緒間的同步。

初始化鎖。在Linux下,執行緒的互斥量資料型別是pthread_mutex_t。在使用前,要對它進行初始化。
靜態分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
動態分配:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_attr_t *mutexattr);
加鎖。對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,呼叫執行緒會阻塞,直到互斥量被解鎖。
int pthread_mutex_lock(pthread_mutex *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
銷燬鎖。鎖在是使用完成後,需要進行銷燬以釋放資源。
int pthread_mutex_destroy(pthread_mutex *mutex);

    #include <cstdio>
    #include <cstdlib>
    #include <unistd.h>
    #include <pthread.h>
    #include "iostream"
    using namespace std;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    int tmp;
    void* thread(void *arg)
    {
        cout << "thread id is " << pthread_self() << endl;
        pthread_mutex_lock(&mutex);
        tmp = 12;
        cout << "Now a is " << tmp << endl;
        pthread_mutex_unlock(&mutex);
        return NULL;
    }
    int main()
    {
        pthread_t id;
        cout << "main thread id is " << pthread_self() << endl;
        tmp = 3;
        cout << "In main func tmp = " << tmp << endl;
        if (!pthread_create(&id, NULL, thread, NULL))
        {
            cout << "Create thread success!" << endl;
        }
        else
        {
            cout << "Create thread failed!" << endl;
        }
        pthread_join(id, NULL);
        pthread_mutex_destroy(&mutex);
        return 0;
    }
    //編譯:g++ -o thread testthread.cpp -lpthread


二、條件變數(cond)

互斥鎖不同,條件變數是用來等待而不是用來上鎖的。條件變數用來自動阻塞一個執行緒,直到某特殊情況發生為止。通常條件變數和互斥鎖同時使用。條件變數分為兩部分: 條件和變數。條件本身是由互斥量保護的。執行緒在改變條件狀態前先要鎖住互斥量。條件變數使我們可以睡眠等待某種條件出現。條件變數是利用執行緒間共享的全域性變數進行同步的一種機制,主要包括兩個動作:一個執行緒等待"條件變數的條件成立"而掛起;另一個執行緒使"條件成立"(給出條件成立訊號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個執行緒自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個執行緒改變了條件,它發訊號給關聯的條件變數,喚醒一個或多個等待它的執行緒,重新獲得互斥鎖,重新評價條件。如果兩程序共享可讀寫的記憶體,條件變數可以被用來實現這兩程序間的執行緒同步。

初始化條件變數。
靜態態初始化,pthread_cond_t cond = PTHREAD_COND_INITIALIER;
動態初始化,int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
等待條件成立。釋放鎖,同時阻塞等待條件變數為真才行。timewait()設定等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個執行緒wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
啟用條件變數。pthread_cond_signal,pthread_cond_broadcast(啟用所有等待執行緒)
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); //解除所有執行緒的阻塞
清除條件變數。無執行緒等待,否則返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);


    #include <stdio.h>
    #include <pthread.h>
    #include "stdlib.h"
    #include "unistd.h"
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    void hander(void *arg)
    {
        free(arg);
        (void)pthread_mutex_unlock(&mutex);
    }
    void *thread1(void *arg)
    {
        pthread_cleanup_push(hander, &mutex);
        while(1)
        {
            printf("thread1 is running\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cond, &mutex);
            printf("thread1 applied the condition\n");
            pthread_mutex_unlock(&mutex);
            sleep(4);
        }
        pthread_cleanup_pop(0);
    }
    void *thread2(void *arg)
    {
        while(1)
        {
            printf("thread2 is running\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cond, &mutex);
            printf("thread2 applied the condition\n");
            pthread_mutex_unlock(&mutex);
            sleep(1);
        }
    }
    int main()
    {
        pthread_t thid1,thid2;
        printf("condition variable study!\n");
        pthread_mutex_init(&mutex, NULL);
        pthread_cond_init(&cond, NULL);
        pthread_create(&thid1, NULL, thread1, NULL);
        pthread_create(&thid2, NULL, thread2, NULL);
        sleep(1);
        do
        {
            pthread_cond_signal(&cond);
        }while(1);
        sleep(20);
        pthread_exit(0);
        return 0;
    }



    #include <pthread.h>
    #include <unistd.h>
    #include "stdio.h"
    #include "stdlib.h"
    static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    struct node
    {
        int n_number;
        struct node *n_next;
    }*head = NULL;

    static void cleanup_handler(void *arg)
    {
        printf("Cleanup handler of second thread./n");
        free(arg);
        (void)pthread_mutex_unlock(&mtx);
    }
    static void *thread_func(void *arg)
    {
        struct node *p = NULL;
        pthread_cleanup_push(cleanup_handler, p);
        while (1)
        {
            //這個mutex主要是用來保證pthread_cond_wait的併發性
            pthread_mutex_lock(&mtx);
            while (head == NULL)
            {
                //這個while要特別說明一下,單個pthread_cond_wait功能很完善,為何
                //這裡要有一個while (head == NULL)呢?因為pthread_cond_wait裡的線
                //程可能會被意外喚醒,如果這個時候head != NULL,則不是我們想要的情況。
                //這個時候,應該讓執行緒繼續進入pthread_cond_wait
                // pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx,
                //然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數情況下是等待的條件成立
                //而被喚醒,喚醒後,該程序會先鎖定先pthread_mutex_lock(&mtx);,再讀取資源
                //用這個流程是比較清楚的
                pthread_cond_wait(&cond, &mtx);
                p = head;
                head = head->n_next;
                printf("Got %d from front of queue/n", p->n_number);
                free(p);
            }
            pthread_mutex_unlock(&mtx); //臨界區資料操作完畢,釋放互斥鎖
        }
        pthread_cleanup_pop(0);
        return 0;
    }
    int main(void)
    {
        pthread_t tid;
        int i;
        struct node *p;
        //子執行緒會一直等待資源,類似生產者和消費者,但是這裡的消費者可以是多個消費者,而
        //不僅僅支援普通的單個消費者,這個模型雖然簡單,但是很強大
        pthread_create(&tid, NULL, thread_func, NULL);
        sleep(1);
        for (i = 0; i < 10; i++)
        {
            p = (struct node*)malloc(sizeof(struct node));
            p->n_number = i;
            pthread_mutex_lock(&mtx); //需要操作head這個臨界資源,先加鎖,
            p->n_next = head;
            head = p;
            pthread_cond_signal(&cond);
            pthread_mutex_unlock(&mtx); //解鎖
            sleep(1);
        }
        printf("thread 1 wanna end the line.So cancel thread 2./n");
        //關於pthread_cancel,有一點額外的說明,它是從外部終止子執行緒,子執行緒會在最近的取消點,退出
        //執行緒,而在我們的程式碼裡,最近的取消點肯定就是pthread_cond_wait()了。
        pthread_cancel(tid);
        pthread_join(tid, NULL);
        printf("All done -- exiting/n");
        return 0;
    }


 三、訊號量(sem)

如同程序一樣,執行緒也可以通過訊號量來實現通訊,雖然是輕量級的。訊號量函式的名字都以"sem_"打頭。執行緒使用的基本訊號量函式有四個。

訊號量初始化。
int sem_init (sem_t *sem , int pshared, unsigned int value);
這是對由sem指定的訊號量進行初始化,設定好它的共享選項(linux 只支援為0,即表示它是當前程序的區域性訊號量),然後給它一個初始值VALUE。
等待訊號量。給訊號量減1,然後等待直到訊號量的值大於0。
int sem_wait(sem_t *sem);
釋放訊號量。訊號量值加1。並通知其他等待執行緒。
int sem_post(sem_t *sem);
銷燬訊號量。我們用完訊號量後都它進行清理。歸還佔有的一切資源。
int sem_destroy(sem_t *sem);


    #include <stdlib.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <pthread.h>
    #include <semaphore.h>
    #include <errno.h>
    #define return_if_fail(p) if((p) == 0){printf ("[%s]:func error!/n", __func__);return;}
    typedef struct _PrivInfo
    {
        sem_t s1;
        sem_t s2;
        time_t end_time;
    }PrivInfo;

    static void info_init (PrivInfo* thiz);
    static void info_destroy (PrivInfo* thiz);
    static void* pthread_func_1 (PrivInfo* thiz);
    static void* pthread_func_2 (PrivInfo* thiz);

    int main (int argc, char** argv)
    {
        pthread_t pt_1 = 0;
        pthread_t pt_2 = 0;
        int ret = 0;
        PrivInfo* thiz = NULL;
        thiz = (PrivInfo* )malloc (sizeof (PrivInfo));
        if (thiz == NULL)
        {
            printf ("[%s]: Failed to malloc priv./n");
            return -1;
        }
        info_init (thiz);
        ret = pthread_create (&pt_1, NULL, (void*)pthread_func_1, thiz);
        if (ret != 0)
        {
            perror ("pthread_1_create:");
        }
        ret = pthread_create (&pt_2, NULL, (void*)pthread_func_2, thiz);
        if (ret != 0)
        {
            perror ("pthread_2_create:");
        }
        pthread_join (pt_1, NULL);
        pthread_join (pt_2, NULL);
        info_destroy (thiz);
        return 0;
    }
    static void info_init (PrivInfo* thiz)
    {
        return_if_fail (thiz != NULL);
        thiz->end_time = time(NULL) + 10;
        sem_init (&thiz->s1, 0, 1);
        sem_init (&thiz->s2, 0, 0);
        return;
    }
    static void info_destroy (PrivInfo* thiz)
    {
        return_if_fail (thiz != NULL);
        sem_destroy (&thiz->s1);
        sem_destroy (&thiz->s2);
        free (thiz);
        thiz = NULL;
        return;
    }
    static void* pthread_func_1 (PrivInfo* thiz)
    {
        return_if_fail(thiz != NULL);
        while (time(NULL) < thiz->end_time)
        {
            sem_wait (&thiz->s2);
            printf ("pthread1: pthread1 get the lock./n");
            sem_post (&thiz->s1);
            printf ("pthread1: pthread1 unlock/n");
            sleep (1);
        }
        return;
    }
    static void* pthread_func_2 (PrivInfo* thiz)
    {
        return_if_fail (thiz != NULL);
        while (time (NULL) < thiz->end_time)
        {
            sem_wait (&thiz->s1);
            printf ("pthread2: pthread2 get the unlock./n");
            sem_post (&thiz->s2);
            printf ("pthread2: pthread2 unlock./n");
            sleep (1);
        }
        return;
    }