1. 程式人生 > 實用技巧 >UNIX環境高階程式設計(11-執行緒)

UNIX環境高階程式設計(11-執行緒)

本章主要介紹執行緒的概念,建立和終止執行緒以及執行緒同步問題。

使用到的函式預設需要包含pthread.h標頭檔案,且在使用gcc編譯時,需要連結pthread庫。

程式碼地址:https://gitee.com/maxiaowei/Linux/tree/master/apue

執行緒的建立與終止

建立執行緒

// Returns: 0 if OK, error number on failure
int pthread_create(pthread_t *restrict tidp,
                   const pthread_attr_t *restrict attr,
                   void *(*start_rtn)(void *), void *restrict arg);

新建立的執行緒的執行緒ID被設定成tidp指向的記憶體單元;attr引數定製執行緒的不同屬性;start_rtn函式是執行緒開始時執行的函式,其引數可以通過arg進行傳遞。

注意:

新執行緒最好不要通過tidp指向的記憶體空間獲取自己的執行緒ID,因為如果新執行緒在主執行緒呼叫pthread_create返回前就運行了,那麼它看到的就是未經初始化的內容,很可能並不是正確的執行緒ID。可以使用pthread_self函式獲取自己的執行緒ID。

pthread_t pthread_self(void);

終止執行緒

任意執行緒呼叫exit_Exit_exit會導致整個程序終止,可以通過以下3種方式,在不終止程序的前提下終止單個執行緒:

  1. 直接從啟動例項中返回
  2. 被同一程序的其他執行緒取消
  3. 呼叫pthread_exit
void pthread_exit(void *rval_ptr);

// Returns: 0 if OK, error number on failure
int pthread_join(pthread_t thread, void **rval_ptr);

呼叫pthread_join的執行緒會一直阻塞,直到指定的執行緒終止。如果指定的執行緒直接返回或者是呼叫pthread_exit終止,則可以通過rval_ptr檢視其返回值;如果執行緒是被取消的,則rval_ptr被設定為PTHRERAD_CANCELED

取消執行緒

// Returns: 0 if OK, error number on failure
int pthread_cancel(pthread_t tid);

用來請求取消同一程序中的其他執行緒。被取消的執行緒的行為表現為如同呼叫了引數為PTHRERAD_CANCELEDpthread_exit函式。但是,執行緒可以選擇忽略或者控制如何被取消。

執行緒清理處理程式

void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);

清理函式rtn只有在以下情況會執行:

  1. 呼叫pthread_exit
  2. 響應取消請求
  3. 非零execute引數呼叫pthread_cleanup_pop(為0時,清理函式不會被呼叫)

這兩個函式需要成對使用

分離執行緒

// Returns: 0 if OK, error number on failure
int pthread_detach(pthread_t tid);

預設情況下,執行緒的終止狀態會保留,直到呼叫pthread_join。如果執行緒被分離,則資源會線上程終止後被立即收回。

執行緒同步

互斥量mutex

// All return: 0 if OK, error number on failure
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
                     const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

互斥變數為pthread_mutex_t型別,如果使用靜態分配方式,可以直接使用PTHREAD_MUTEX_INITIALIZER進行初始化。對於動態分配的互斥量,在釋放記憶體前需要呼叫pthread_mutex_destroy

帶有超時的互斥鎖

如果不希望執行緒在訪問加鎖的互斥量時無限等待,可以通過pthread_mutex_timedlock指定等待的絕對時間。

#include <time.h>
// Returns: 0 if OK, error number on failure
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
                          const struct timespec *restrict tsptr);

示例

#include <pthread.h>
#include <time.h>

#include "apue.h"

int main()
{
  int err;
  struct timespec tout;
  struct tm *tmp;
  char buf[64];
  pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

// 加鎖
  pthread_mutex_lock(&lock);
  printf("mutex is locked.\n");
  clock_gettime(CLOCK_REALTIME, &tout);
  tmp = localtime(&tout.tv_sec);
  strftime(buf, sizeof(buf), "%r", tmp);
  printf("current time is %s\n", buf);

// 設定超時
  tout.tv_sec += 10;
  err = pthread_mutex_timedlock(&lock, &tout);
  clock_gettime(CLOCK_REALTIME, &tout);
  tmp = localtime(&tout.tv_sec);
  strftime(buf, sizeof(buf), "%r", tmp);
  printf("the time is now %s\n", buf);

  if(err == 0) {
    printf("mutex locked.\n");
  } else {
    printf("can't lock mutex:%s\n",strerror(err));
  }

  return 0;
}

讀寫鎖rwlock

讀寫鎖有3中狀態:不加鎖、讀模式加鎖和寫模式加鎖。一次只有一個執行緒可以佔有寫模式的讀寫鎖,但是多個執行緒可以同時佔有讀模式的讀寫鎖。

讀寫鎖適合對資料結構讀的次數遠大於寫的情況。

// Both return: 0 if OK, error number on failure
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
                      const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

讀寫鎖在使用前必須初始化,在釋放它們底層的記憶體前必須銷燬

// All return: 0 if OK, error number on failure
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 讀模式鎖定
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 寫模式鎖定
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

帶有超時的讀寫鎖

// Both return: 0 if OK, error number on failure
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
                             const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
                             const struct timespec *restrict tsptr);

與互斥量類似。

條件變數cond

當執行緒等待的條件變數被滿足後,該執行緒就會被喚醒。條件變數需要和互斥量配合使用,條件本身是由互斥量保護的。

在使用條件變數之前,必須對其進行初始化(有靜態和動態2種方式)。

// All return: 0 if OK, error number on failure
int pthread_cond_init(pthread_cond_t *restrict cond,
                    const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *restrict cond,
                    pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
                         pthread_mutex_t *restrict mutex,
                         const struct timespec *restrict tsptr);

int pthread_cond_signal(pthread_cond_t *cond);    // 至少喚醒一個
int pthread_cond_broadcast(pthread_cond_t *cond); // 全部喚醒

pthread_cond_wait操作主要執行如下操作步驟:

1.解鎖互斥量mutex
2.阻塞呼叫執行緒,直至另一執行緒就條件變數cond發出訊號
3.重新鎖定mutex

因此,在使用pthread_cond_wait函式之前,應該已經取得mutex鎖。

另外,對pthread_cond_wait的呼叫應該放在while迴圈中,因為從wait

函式返回時,並不能確定條件已經得到滿足(其他執行緒先醒來、虛假喚醒等),需要重新對條件進行判斷。

示例

僅摘錄主要程式碼,完整程式碼見ch11/pthread_cond.c

// 消費者程序
void *process_msg(void *arg)
{
  for (;;) {
    pthread_mutex_lock(&qlock);
    while (count <= 0) {
      printf("%s wait msg\n", tag);
      pthread_cond_wait(&qready, &qlock);
    }
    count--;
    pthread_mutex_unlock(&qlock);
    /* 處理訊息 */
    // 放棄cpu,讓另一個處理進場有機會得到資料
    sleep(1);
  }
  return NULL;
}
// 生產者程序
int main(void)
{
  for (;;) {
    pthread_mutex_lock(&qlock);
    count += 4;
    pthread_mutex_unlock(&qlock);
    // 測試兩種喚醒方式
#if 1
    pthread_cond_broadcast(&qready);
#else
    pthread_cond_signal(&qready);
#endif
    // 保證兩個消費者程序都可以有時間處理資料
    sleep(3);
  }
  return 0;
}

自旋鎖spin

自旋鎖與互斥量大體類似,主要的不同之處在於自旋鎖在獲取鎖之前會一直忙等。因此,使用自旋鎖應該保證持有鎖的時間很短。

自旋鎖和互斥量的介面類似:

// All return: 0 if OK, error number on failure
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);

int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

pshared表示程序共享(process-shared)屬性,表明自旋鎖的獲取方式。它僅在支援執行緒程序共享同步(Thread Process-Shared Synchronization)的平臺上有效,當設定為PTHREAD_PROCESS_SHARED,則只要執行緒可以訪問鎖底層記憶體,即使是不同程序的執行緒都可以獲得鎖;而設定為PTHREAD_PROCESS_PRIVATE後,只有初始化該鎖的程序內部的執行緒可以訪問它。

屏障barrier

屏障允許多個執行緒等待,直到所有合作執行緒滿足某個點後,從該點繼續執行。主執行緒可以將某個任務分解多個小任務交給不同的執行緒,等到所有執行緒工作完成後,主執行緒在此基礎上繼續執行。

如書中的例子,使用8個執行緒分解800萬個數的排序工作,每個執行緒對其中的100萬個數排序,最後由主執行緒將這些結果進行合併。

// Both return: 0 if OK, error number on failure
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
                       const pthread_barrierattr_t *restrict attr,
                       unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);

初始化函式中的count引數用於指定所有執行緒繼續執行前,必須到達屏障的執行緒數。

// Returns: 0 or PTHREAD_BARRIER_SERIAL_THREAD if OK, error number on failure
int pthread_barrier_wait(pthread_barrier_t *barrier);

wait函式表明當前執行緒已完成工作,準備等待其他執行緒。當執行緒呼叫該函式後滿足屏障計數,那麼函式的返回值為PTHREAD_BARRIER_SERIAL_THREAD,其餘執行緒該函式返回值為0。這一特點使得可以很容易的將一個執行緒作為主執行緒,它可以工作在其他所有執行緒已完成的工作結果上。

示例

見ch11/pthread_barrier.c

#include <pthread.h>

#include "apue.h"

pthread_barrier_t pb;
pthread_t t1, t2;

void *th1(void *a)
{
  printf("start t1\n");
  sleep(1);
  // 最後一個完成的執行緒,返回值應該為-1
  int r = pthread_barrier_wait(&pb);
  printf("th1  r:%d\n", r);
  return NULL;
}

void *th2(void *a)
{
  printf("start t2\n");
  int r = pthread_barrier_wait(&pb);
  printf("th2  r:%d\n", r);
  return NULL;
}

int main()
{
  int r;
  pthread_barrier_init(&pb, NULL, 3);

  pthread_create(&t1, NULL, th1, NULL);
  pthread_create(&t2, NULL, th2, NULL);

  r = pthread_barrier_wait(&pb);
  printf("main r:%d\n", r);

  // 等待子程序結束
  pthread_join(t1, NULL);
  pthread_join(t2, NULL);
  return 0;
}