Linux系統編程——線程(2)
目錄
- Linux系統編程——線程(2)
- 同步概念
- 線程同步
- 數據混亂原因:
- 互斥量mutex
- pthread_mutex_init函數
- pthread_mutex_destroy函數
- pthread_mutex_lock函數
- pthread_mutex_unlock函數
- pthread_mutex_trylock函數
- 線程加鎖後被取消
- 多線程售票功能(互斥鎖)
- 條件變量
- pthread_cond_init函數
- pthread_cond_destroy函數
- pthread_cond_wait函數(※)
- pthread_cond_timedwait函數
- pthread_cond_signal函數
- pthread_cond_broadcast函數
- 條件變量的優點
- 生產者消費者模型
- 同步概念
Linux系統編程——線程(2)
前情提要: Linux系統編程——線程(1)
同步概念
? 所謂同步,即同時起步,協調一致。不同的對象,對“同步”的理解方式略有不同。如,設備同步,是指在兩個設備之間規定一個共同的時間參考;數據庫同步,是指讓兩個或多個數據庫內容保持一致,或者按需要部分保持一致;文件同步,是指讓兩個或多個文件夾裏的文件保持一致。
? 而編程中、通信中所說的同步與生活中大家印象中的同步概念略有差異。“同”字應是指協同、協助、互相配合。主旨在協同步調,按預定的先後次序運行。
線程同步
? 同步即協同步調,按預定的先後次序運行。
? 線程同步,指一個線程發出某一功能調用時,在沒有得到結果之前,該調用不返回。同時其它線程為保證數據一致性,不能調用該功能。
舉例:
#include <func.h>
//主線程與子線程各加1000萬
#define N 10000000
void* threadFunc(void *p)
{
int *num=(int*)p;
int i;
for(i=0;i<N;i++)
{
*num+=1;
}
printf("I am child thread\n");
return NULL;
}
int main()
{
pthread_t pthID;//線程ID
int ret;
int num=0;
ret=pthread_create(&pthID,NULL,threadFunc,&num);
if(ret!=0)
{
printf("pthread_create:%s\n",strerror(ret));
return -1;
}
int i;
for(i=0;i<N;i++)
{
num=num+1;
}
pthread_join(pthID,NULL);
printf("I am main thread,%d\n",num);
return 0;
}
? “同步”的目的,是為了避免數據混亂,解決與時間有關的錯誤。實際上,不僅線程間需要同步,進程間、信號間等等都需要同步機制。
? 因此,所有“多個控制流,共同操作一個共享資源”的情況,都需要同步。
數據混亂原因:
資源共享(獨享資源則不會)
調度隨機(意味著數據訪問會出現競爭)
線程間缺乏必要的同步機制。
? 以上3點中,前兩點不能改變,欲提高效率,傳遞數據,資源必須共享。只要共享資源,就一定會出現競爭。只要存在競爭關系,數據就很容易出現混亂。
? 所以只能從第三點著手解決。使多個線程在訪問共享資源的時候,出現互斥。
互斥量mutex
Linux中提供一把互斥鎖mutex(也稱之為互斥量)。
每個線程在對資源操作前都嘗試先加鎖,成功加鎖才能操作,操作結束解鎖。
資源還是共享的,線程間也還是競爭的。
pthread_mutex_init函數
pthread_mutex_destroy函數
pthread_mutex_lock函數
pthread_mutex_trylock函數
pthread_mutex_unlock函數
以上5個函數的返回值都是:成功返回0, 失敗返回錯誤號。
pthread_mutex_t 類型,其本質是一個結構體。為簡化理解,應用時可忽略其實現細節,簡單當成整數看待。
pthread_mutex_t mutex; 變量mutex只有兩種取值1、0。
pthread_mutex_init函數
初始化一個互斥鎖(互斥量) ---> 初值可看作1
? int pthread_mutex_init(pthread_mutex_t restrict mutex, const pthread_mutexattr_t restrict attr);
參1:傳出參數,調用時應傳 &mutex
restrict關鍵字:只用於限制指針,告訴編譯器,所有修改該指針指向內存中內容的操作,只能通過本指針完成。不能通過除本指針以外的其他變量或指針修改
參2:互斥量屬性。是一個傳入參數,通常傳NULL,選用默認屬性(線程間共享)。 參APUE.12.4同步屬性
靜態初始化:如果互斥鎖 mutex 是靜態分配的(定義在全局,或加了static關鍵字修飾),可以直接使用宏進行初始化。e.g. pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;
動態初始化:局部變量應采用動態初始化。e.g. pthread_mutex_init(&mutex, NULL)
pthread_mutex_destroy函數
銷毀一個互斥鎖
? int pthread_mutex_destroy(pthread_mutex_t *mutex);
#include <func.h>
int main()
{
pthread_mutex_t mutex;
int ret;
ret=pthread_mutex_init(&mutex,NULL);
THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
ret=pthread_mutex_destroy(&mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
return 0;
}
pthread_mutex_lock函數
加鎖。可理解為將mutex--(或-1)
? int pthread_mutex_lock(pthread_mutex_t *mutex);
#include <func.h>
int main()
{
pthread_mutex_t mutex;
int ret;
ret=pthread_mutex_init(&mutex,NULL);
THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&mutex);//會阻塞
printf("you can't see me\n");
ret=pthread_mutex_destroy(&mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
return 0;
}
pthread_mutex_unlock函數
解鎖。可理解為將mutex ++(或+1)
? int pthread_mutex_unlock(pthread_mutex_t *mutex);
主線程與子線程實現對同一變量的加法運算:
#include <func.h>
//主線程與子線程各加1000萬
#define N 10000000
typedef struct{
int num;
pthread_mutex_t mutex;
}Data;
void* threadFunc(void *p)
{
Data *pThreadInfo=(Data *)p;
int i;
for(i=0;i<N;i++)
{
pthread_mutex_lock(&pThreadInfo->mutex);
pThreadInfo->num+=1;
pthread_mutex_unlock(&pThreadInfo->mutex);
}
printf("I am child thread\n");
return NULL;
}
int main()
{
pthread_t pthID;//線程ID
int ret;
Data threadInfo;
threadInfo.num=0;
ret=pthread_mutex_init(&threadInfo.mutex,NULL);
THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
struct timeval start,end;
gettimeofday(&start,NULL);
ret=pthread_create(&pthID,NULL,threadFunc,&threadInfo);
if(ret!=0)
{
printf("pthread_create:%s\n",strerror(ret));
return -1;
}
int i;
for(i=0;i<N;i++)
{
pthread_mutex_lock(&threadInfo.mutex);
threadInfo.num+=1;
pthread_mutex_unlock(&threadInfo.mutex);
}
pthread_join(pthID,NULL);
gettimeofday(&end,NULL);
ret=pthread_mutex_destroy(&threadInfo.mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
printf("I am main thread,%d,use time=%ld\n",threadInfo.num,(end.tv_sec-start.tv_sec)*1000000+end.tv_usec-start.tv_usec);
return 0;
}
pthread_mutex_trylock函數
嘗試加鎖
? int pthread_mutex_trylock(pthread_mutex_t *mutex);
#include <func.h>
//trylock嘗試加鎖
int main()
{
pthread_mutex_t mutex;
int ret;
ret=pthread_mutex_init(&mutex,NULL);
THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
pthread_mutex_lock(&mutex);
ret=pthread_mutex_trylock(&mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_trylock");
ret=pthread_mutex_destroy(&mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
return 0;
}
線程加鎖後被取消
? 線程取消的方法是一個線程向目標線程發cancel 信號,但是如何處理cancel 信號則由目標線程自己決
定,目標線程或者忽略、或者立即終止、或者繼續運行至cancelation-point(取消點)後終止。
? 線程為了訪問臨界共享資源而為其加上鎖,但在訪問過程中該線程被外界取消,或者發生了中斷,則該
臨界資源將永遠處於鎖定狀態得不到釋放。外界取消操作是不可預見的,因此的確需要一個機制來簡化用於
資源釋放的編程。
? 在POSIX 線程API 中提供了一個pthread_cleanup_push()/pthread_cleanup_pop()函數對用於自動釋
放資源,從pthread_cleanup_push()的調用點到pthread_cleanup_pop()之間的程序段中的終止動作都將執行
pthread_cleanup_push()所指定的清理函數。
函數原型:
void pthread_cleanup_push(void (*routine) (void *), void *arg)
void pthread_cleanup_pop(int execute)
pthread_cleanup_push()/pthread_cleanup_pop()
采用先入後出的棧結構管理,並且必須成對出現!
void routine(void *arg)
函數在調用pthread_cleanup_push()
時壓入清理函數棧,多次對
pthread_cleanup_push()
的調用將在清理函數棧中形成一個函數鏈,在執行該函數鏈時按照壓棧的相反
順序彈出。execute 參數表示執行到pthread_cleanup_pop()
時是否在彈出清理函數的同時執行該函數,
為0 表示不執行,非0 為執行;這個參數並不影響異常終止時清理函數的執行。
#include <func.h>
void cleanup(void *p)
{
pthread_mutex_unlock((pthread_mutex_t*)p);
printf("unlock success\n");
}
//線程加鎖以後被cancel怎麽辦
void* threadFunc(void *p)
{
pthread_mutex_t* pMutex=(pthread_mutex_t*)p;
pthread_cleanup_push(cleanup,pMutex); //指定清理函數cleanup
pthread_mutex_lock(pMutex);
sleep(3);
pthread_cleanup_pop(1);
pthread_exit(NULL);
}
int main()
{
pthread_mutex_t mutex;
int ret;
ret=pthread_mutex_init(&mutex,NULL);
THREAD_ERROR_CHECK(ret,"pthread_mutex_init");
pthread_t pthId[2];
int i;
for(i=0;i<2;i++)
{
pthread_create(pthId+i,NULL,threadFunc,&mutex);
}
for(i=0;i<2;i++)
{
ret=pthread_cancel(pthId[i]);
THREAD_ERROR_CHECK(ret,"pthread_cancel");
}
for(i=0;i<2;i++)
{
pthread_join(pthId[i],NULL);
}
ret=pthread_mutex_destroy(&mutex);
THREAD_ERROR_CHECK(ret,"pthread_mutex_destroy");
return 0;
}
多線程售票功能(互斥鎖)
#include <func.h>
typedef struct{
int tickets;
pthread_mutex_t mutex;
}Train;
void* saleWindows1(void* p)
{
Train *pSale=(Train*)p;
int count=0;
while(1)
{
pthread_mutex_lock(&pSale->mutex);
if(pSale->tickets>0)
{
//printf("I am saleWindows1,the tickets is %d\n",pSale->tickets);
pSale->tickets--;
count++;
//printf("sale finish,I am saleWindows1,the tickets is %d\n",pSale->tickets);
pthread_mutex_unlock(&pSale->mutex);
}else{
pthread_mutex_unlock(&pSale->mutex);
printf("I am windows1 sale %d\n",count);
break;
}
}
return NULL;
}
void* saleWindows2(void* p)
{
Train *pSale=(Train*)p;
int count=0;
while(1)
{
pthread_mutex_lock(&pSale->mutex);
if(pSale->tickets>0)
{
//printf("I am saleWindows2,the tickets is %d\n",pSale->tickets);
pSale->tickets--;
count++;
//printf("sale finish,I am saleWindows2,the tickets is %d\n",pSale->tickets);
pthread_mutex_unlock(&pSale->mutex);
}else{
pthread_mutex_unlock(&pSale->mutex);
printf("I am windows2 sale %d\n",count);
break;
break;
}
}
return NULL;
}
typedef void* (*threadFunc)(void*);
int main()
{
Train t;
pthread_t pthId[2];
int i;
t.tickets=20000000;
threadFunc pthreadFunc[2]={saleWindows1,saleWindows2};
pthread_mutex_init(&t.mutex,NULL);
for(i=0;i<2;i++)
{
pthread_create(pthId+i,NULL,pthreadFunc[i],&t);
}
for(i=0;i<2;i++)
{
pthread_join(pthId[i],NULL);
}
printf("sale over\n");
}
條件變量
? 條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待條件變
量的條件成立而掛起;另一個線程使條件成立(給出條件成立信號)。為了防止競爭,條件變量的使用總是
和一個互斥鎖結合在一起。
? pthread_cond_init函數
? pthread_cond_destroy函數
? pthread_cond_wait函數
? pthread_cond_timedwait函數
? pthread_cond_signal函數
? pthread_cond_broadcast函數
以上6 個函數的返回值都是:成功返回0, 失敗直接返回錯誤號。
? pthread_cond_t類型 用於定義條件變量
? pthread_cond_t cond;
pthread_cond_init函數
初始化一個條件變量
int pthread_cond_init(pthread_cond_t restrict cond, const pthread_condattr_t restrict attr);
參2:attr表條件變量屬性,通常為默認值,傳NULL即可
也可以使用靜態初始化的方法,初始化條件變量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_destroy函數
銷毀一個條件變量
int pthread_cond_destroy(pthread_cond_t *cond);
pthread_cond_wait函數(※)
阻塞等待一個條件變量
? int pthread_cond_wait(pthread_cond_t restrict cond, pthread_mutex_t restrict mutex);
函數作用:
阻塞等待條件變量cond(參1)滿足
釋放已掌握的互斥鎖(解鎖互斥量)相當於pthread_mutex_unlock(&mutex);
1.2.兩步為一個原子操作。
- 當被喚醒,pthread_cond_wait函數返回時,解除阻塞並重新申請獲取互斥鎖pthread_mutex_lock(&mutex);
先舉一個簡單的例子:
#include <func.h>
typedef struct{
pthread_cond_t cond;
pthread_mutex_t mutex;
}Data;
void* threadFunc(void* p)
{
Data* pthreadInfo=(Data*)p;
int ret;
pthread_mutex_lock(&pthreadInfo->mutex);
ret=pthread_cond_wait(&pthreadInfo->cond,&pthreadInfo->mutex);
pthread_mutex_unlock(&pthreadInfo->mutex);
printf("I am child,after wait\n");
pthread_exit(NULL);
}
int main()
{
Data threadInfo;
int ret;
ret=pthread_cond_init(&threadInfo.cond,NULL);
THREAD_ERROR_CHECK(ret,"pthread_cond_init");
pthread_mutex_init(&threadInfo.mutex,NULL);
pthread_t pthId;
pthread_create(&pthId,NULL,threadFunc,&threadInfo);
sleep(1);
ret=pthread_cond_signal(&threadInfo.cond);
THREAD_ERROR_CHECK(ret,"pthread_cond_signal");
printf("send signal ok\n");
pthread_join(pthId,NULL);
printf("I am main thread\n");
return 0;
}
代碼解釋:主線程創建結構體,包含mutex和cond,初始化後創建子線程,子線程上鎖,並執行pthread_cond_wait阻塞等待條件變量的到來,sleep(1)後主線程發送signal信號使條件變量成立,故打印出 send signal ok和I am child,after wait,最後用join回收子線程。
pthread_cond_timedwait函數
功能:計時等待一個條件變量。線程解開mutex 指向的鎖並被條件變量cond 阻塞。其中計時等待方式表示經歷abstime 段時間後,即使條件變量不滿足,阻塞也被解除。
函數原型:
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
參3: 參看man sem_timedwait函數,查看struct timespec結構體。
? struct timespec {
? time_t tv_sec; /* seconds */ 秒
? long tv_nsec; /* nanosecondes*/ 納秒
? }
形參abstime:絕對時間。
如:time(NULL)返回的就是絕對時間。而alarm(1)是相對時間,相對當前時間定時1秒鐘。
struct timespec t = {1, 0};
pthread_cond_timedwait (&cond, &mutex, &t);
只能定時到 1970年1月1日 00:00:01秒(早已經過去)
正確用法:
time_t cur = time(NULL); //獲取當前時間。
struct timespec t; //定義timespec 結構體變量t
t.tv_sec = cur+1; //定時1秒
pthread_cond_timedwait (&cond, &mutex, &t); //傳參
struct timeval {
time_t tv_sec; /* seconds * 秒/
suseconds_t tv_usec; /* microseconds * 微秒/
};
示例:
#include <func.h>
typedef struct{
pthread_cond_t cond;
pthread_mutex_t mutex;
}Data;
void* threadFunc(void* p)
{
Data* pthreadInfo=(Data*)p;
int ret;
struct timespec t;
t.tv_nsec=0;
t.tv_sec=time(NULL)+5;
pthread_mutex_lock(&pthreadInfo->mutex);
ret=pthread_cond_timedwait(&pthreadInfo->cond,&pthreadInfo->mutex,&t);
printf("pthread_cond_timedwait ret=%d\n",ret);
pthread_mutex_unlock(&pthreadInfo->mutex);
printf("I am child,after wait\n");
pthread_exit(NULL);
}
int main()
{
Data threadInfo;
int ret;
ret=pthread_cond_init(&threadInfo.cond,NULL);
THREAD_ERROR_CHECK(ret,"pthread_cond_init");
pthread_mutex_init(&threadInfo.mutex,NULL);
pthread_t pthId;
pthread_create(&pthId,NULL,threadFunc,&threadInfo);
sleep(10);
ret=pthread_cond_signal(&threadInfo.cond);
THREAD_ERROR_CHECK(ret,"pthread_cond_signal");
printf("send signal ok\n");
pthread_join(pthId,NULL);
printf("I am main thread\n");
return 0;
}
代碼解釋:主線程控制10s後發signal但子線程設置5秒計時,超時則自動解鎖,故程序啟動後過了5秒後打印I am child,after wait,再過5秒後打印剩余部分!
pthread_cond_signal函數
喚醒至少一個阻塞在條件變量上的線程
int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_broadcast函數
喚醒全部阻塞在條件變量上的線程
? int pthread_cond_broadcast(pthread_cond_t *cond);
條件變量的優點
? 相較於mutex而言,條件變量可以減少競爭。
? 如直接使用mutex,除了生產者、消費者之間要競爭互斥量以外,消費者之間也需要競爭互斥量,但如
果匯聚(鏈表)中沒有數據,消費者之間競爭互斥鎖是無意義的。有了條件變量機制以後,只有生產者完成
生產,才會引起消費者之間的競爭。提高了程序效率。
生產者消費者模型
通過設置條件變量,在售票為空後自動補票。
#include <func.h>
typedef struct{
int tickets;
pthread_mutex_t mutex;
pthread_cond_t cond;
}Train;
void* saleWindows1(void* p)
{
Train *pSale=(Train*)p;
int count=0;
while(1)
{
pthread_mutex_lock(&pSale->mutex);
if(pSale->tickets>0)
{
printf("I am saleWindows1,the tickets is %d\n",pSale->tickets);
pSale->tickets--;
if(0==pSale->tickets)
{
pthread_cond_signal(&pSale->cond);
}
count++;
printf("sale finish,I am saleWindows1,the tickets is %d\n",pSale->tickets);
pthread_mutex_unlock(&pSale->mutex);
}else{
pthread_mutex_unlock(&pSale->mutex);
printf("I am windows1 sale %d\n",count);
break;
}
sleep(1);
}
return NULL;
}
void* saleWindows2(void* p)
{
Train *pSale=(Train*)p;
int count=0;
while(1)
{
pthread_mutex_lock(&pSale->mutex);
if(pSale->tickets>0)
{
printf("I am saleWindows2,the tickets is %d\n",pSale->tickets);
pSale->tickets--;
if(0==pSale->tickets)
{
pthread_cond_signal(&pSale->cond);
}
count++;
printf("sale finish,I am saleWindows2,the tickets is %d\n",pSale->tickets);
pthread_mutex_unlock(&pSale->mutex);
}else{
pthread_mutex_unlock(&pSale->mutex);
printf("I am windows2 sale %d\n",count);
break;
}
sleep(1);
}
return NULL;
}
void* setTickets(void* p)
{
Train *pSale=(Train*)p;
pthread_mutex_lock(&pSale->mutex);
if(pSale->tickets>0)
{
pthread_cond_wait(&pSale->cond,&pSale->mutex);
}
pSale->tickets=20;
pthread_mutex_unlock(&pSale->mutex);
return NULL;
}
typedef void* (*threadFunc)(void*);
#define N 3
int main()
{
Train t;
pthread_t pthId[N];
int i;
t.tickets=20;
threadFunc pthreadFunc[N]={saleWindows1,saleWindows2,setTickets};
pthread_mutex_init(&t.mutex,NULL);
pthread_cond_init(&t.cond,NULL);
for(i=0;i<N;i++)
{
pthread_create(pthId+i,NULL,pthreadFunc[i],&t);
}
for(i=0;i<N;i++)
{
pthread_join(pthId[i],NULL);
}
printf("sale over\n");
}
Linux系統編程——線程(2)