UNIX環境高階程式設計(11-執行緒)
本章主要介紹執行緒的概念,建立和終止執行緒以及執行緒同步問題。
使用到的函式預設需要包含pthread.h標頭檔案,且在使用gcc編譯時,需要連結pthread庫。
程式碼地址:https://gitee.com/maxiaowei/Linux/tree/master/apue
執行緒的建立與終止
建立執行緒
// Returns: 0 if OK, error number on failure int pthread_create(pthread_t *restrict tidp, const pthread_attr_t *restrict attr, void *(*start_rtn)(void *), void *restrict arg);
新建立的執行緒的執行緒ID被設定成tidp
指向的記憶體單元;attr
引數定製執行緒的不同屬性;start_rtn
函式是執行緒開始時執行的函式,其引數可以通過arg
進行傳遞。
注意:
新執行緒最好不要通過tidp
指向的記憶體空間獲取自己的執行緒ID,因為如果新執行緒在主執行緒呼叫pthread_create
返回前就運行了,那麼它看到的就是未經初始化的內容,很可能並不是正確的執行緒ID。可以使用pthread_self
函式獲取自己的執行緒ID。
pthread_t pthread_self(void);
終止執行緒
任意執行緒呼叫exit
、_Exit
或_exit
會導致整個程序終止,可以通過以下3種方式,在不終止程序的前提下終止單個執行緒:
- 直接從啟動例項中返回
- 被同一程序的其他執行緒取消
- 呼叫
pthread_exit
void pthread_exit(void *rval_ptr);
// Returns: 0 if OK, error number on failure
int pthread_join(pthread_t thread, void **rval_ptr);
呼叫pthread_join
的執行緒會一直阻塞,直到指定的執行緒終止。如果指定的執行緒直接返回或者是呼叫pthread_exit
終止,則可以通過rval_ptr
檢視其返回值;如果執行緒是被取消的,則rval_ptr
被設定為PTHRERAD_CANCELED
取消執行緒
// Returns: 0 if OK, error number on failure
int pthread_cancel(pthread_t tid);
用來請求取消同一程序中的其他執行緒。被取消的執行緒的行為表現為如同呼叫了引數為PTHRERAD_CANCELED
的pthread_exit
函式。但是,執行緒可以選擇忽略或者控制如何被取消。
執行緒清理處理程式
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
清理函式rtn
只有在以下情況會執行:
- 呼叫
pthread_exit
- 響應取消請求
- 用非零
execute
引數呼叫pthread_cleanup_pop(為0時,清理函式不會被呼叫)
這兩個函式需要成對使用。
分離執行緒
// Returns: 0 if OK, error number on failure
int pthread_detach(pthread_t tid);
預設情況下,執行緒的終止狀態會保留,直到呼叫pthread_join
。如果執行緒被分離,則資源會線上程終止後被立即收回。
執行緒同步
互斥量mutex
// All return: 0 if OK, error number on failure
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥變數為pthread_mutex_t
型別,如果使用靜態分配方式,可以直接使用PTHREAD_MUTEX_INITIALIZER
進行初始化。對於動態分配的互斥量,在釋放記憶體前需要呼叫pthread_mutex_destroy
。
帶有超時的互斥鎖
如果不希望執行緒在訪問加鎖的互斥量時無限等待,可以通過pthread_mutex_timedlock
指定等待的絕對時間。
#include <time.h>
// Returns: 0 if OK, error number on failure
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
示例
#include <pthread.h>
#include <time.h>
#include "apue.h"
int main()
{
int err;
struct timespec tout;
struct tm *tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 加鎖
pthread_mutex_lock(&lock);
printf("mutex is locked.\n");
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s\n", buf);
// 設定超時
tout.tv_sec += 10;
err = pthread_mutex_timedlock(&lock, &tout);
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("the time is now %s\n", buf);
if(err == 0) {
printf("mutex locked.\n");
} else {
printf("can't lock mutex:%s\n",strerror(err));
}
return 0;
}
讀寫鎖rwlock
讀寫鎖有3中狀態:不加鎖、讀模式加鎖和寫模式加鎖。一次只有一個執行緒可以佔有寫模式的讀寫鎖,但是多個執行緒可以同時佔有讀模式的讀寫鎖。
讀寫鎖適合對資料結構讀的次數遠大於寫的情況。
// Both return: 0 if OK, error number on failure
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
讀寫鎖在使用前必須初始化,在釋放它們底層的記憶體前必須銷燬。
// All return: 0 if OK, error number on failure
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 讀模式鎖定
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 寫模式鎖定
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
帶有超時的讀寫鎖
// Both return: 0 if OK, error number on failure
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
與互斥量類似。
條件變數cond
當執行緒等待的條件變數被滿足後,該執行緒就會被喚醒。條件變數需要和互斥量配合使用,條件本身是由互斥量保護的。
在使用條件變數之前,必須對其進行初始化(有靜態和動態2種方式)。
// All return: 0 if OK, error number on failure
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
int pthread_cond_signal(pthread_cond_t *cond); // 至少喚醒一個
int pthread_cond_broadcast(pthread_cond_t *cond); // 全部喚醒
pthread_cond_wait
操作主要執行如下操作步驟:
1.解鎖互斥量mutex
2.阻塞呼叫執行緒,直至另一執行緒就條件變數cond發出訊號
3.重新鎖定mutex
因此,在使用pthread_cond_wait
函式之前,應該已經取得mutex鎖。
另外,對pthread_cond_wait
的呼叫應該放在while迴圈中,因為從wait
函式返回時,並不能確定條件已經得到滿足(其他執行緒先醒來、虛假喚醒等),需要重新對條件進行判斷。
示例
僅摘錄主要程式碼,完整程式碼見ch11/pthread_cond.c
// 消費者程序
void *process_msg(void *arg)
{
for (;;) {
pthread_mutex_lock(&qlock);
while (count <= 0) {
printf("%s wait msg\n", tag);
pthread_cond_wait(&qready, &qlock);
}
count--;
pthread_mutex_unlock(&qlock);
/* 處理訊息 */
// 放棄cpu,讓另一個處理進場有機會得到資料
sleep(1);
}
return NULL;
}
// 生產者程序
int main(void)
{
for (;;) {
pthread_mutex_lock(&qlock);
count += 4;
pthread_mutex_unlock(&qlock);
// 測試兩種喚醒方式
#if 1
pthread_cond_broadcast(&qready);
#else
pthread_cond_signal(&qready);
#endif
// 保證兩個消費者程序都可以有時間處理資料
sleep(3);
}
return 0;
}
自旋鎖spin
自旋鎖與互斥量大體類似,主要的不同之處在於自旋鎖在獲取鎖之前會一直忙等。因此,使用自旋鎖應該保證持有鎖的時間很短。
自旋鎖和互斥量的介面類似:
// All return: 0 if OK, error number on failure
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
pshared
表示程序共享(process-shared)屬性,表明自旋鎖的獲取方式。它僅在支援執行緒程序共享同步(Thread Process-Shared Synchronization)的平臺上有效,當設定為PTHREAD_PROCESS_SHARED
,則只要執行緒可以訪問鎖底層記憶體,即使是不同程序的執行緒都可以獲得鎖;而設定為PTHREAD_PROCESS_PRIVATE
後,只有初始化該鎖的程序內部的執行緒可以訪問它。
屏障barrier
屏障允許多個執行緒等待,直到所有合作執行緒滿足某個點後,從該點繼續執行。主執行緒可以將某個任務分解多個小任務交給不同的執行緒,等到所有執行緒工作完成後,主執行緒在此基礎上繼續執行。
如書中的例子,使用8個執行緒分解800萬個數的排序工作,每個執行緒對其中的100萬個數排序,最後由主執行緒將這些結果進行合併。
// Both return: 0 if OK, error number on failure
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
初始化函式中的count
引數用於指定所有執行緒繼續執行前,必須到達屏障的執行緒數。
// Returns: 0 or PTHREAD_BARRIER_SERIAL_THREAD if OK, error number on failure
int pthread_barrier_wait(pthread_barrier_t *barrier);
wait函式表明當前執行緒已完成工作,準備等待其他執行緒。當執行緒呼叫該函式後滿足屏障計數,那麼函式的返回值為PTHREAD_BARRIER_SERIAL_THREAD
,其餘執行緒該函式返回值為0。這一特點使得可以很容易的將一個執行緒作為主執行緒,它可以工作在其他所有執行緒已完成的工作結果上。
示例
見ch11/pthread_barrier.c
#include <pthread.h>
#include "apue.h"
pthread_barrier_t pb;
pthread_t t1, t2;
void *th1(void *a)
{
printf("start t1\n");
sleep(1);
// 最後一個完成的執行緒,返回值應該為-1
int r = pthread_barrier_wait(&pb);
printf("th1 r:%d\n", r);
return NULL;
}
void *th2(void *a)
{
printf("start t2\n");
int r = pthread_barrier_wait(&pb);
printf("th2 r:%d\n", r);
return NULL;
}
int main()
{
int r;
pthread_barrier_init(&pb, NULL, 3);
pthread_create(&t1, NULL, th1, NULL);
pthread_create(&t2, NULL, th2, NULL);
r = pthread_barrier_wait(&pb);
printf("main r:%d\n", r);
// 等待子程序結束
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return 0;
}