Linux Posix訊號量、System V訊號量,生產者與消費者問題應用
基本概念
訊號量是什麼?
訊號量(semaphore)是一種用於不同程序間或一個給定程序的不同執行緒間同步手段的原語。
也就是說,訊號量用於程序間,或者執行緒間同步的。
三種類型訊號量
訊號量主要有三種類型,都可以用於程序或執行緒間同步:
- Posix有名(named)訊號量:使用Posix IPC名字標識;
- Posix基於記憶體(memory-based)的訊號量(又稱,無名訊號量 unnamed semaphore):放在單個程序的記憶體中,可用於執行緒間同步;放在共享記憶體中,可用於程序間同步;
- System V訊號量:核心維護;
對於使用者來說,它們並沒有本質區別,而是介面不同導致的使用方式不同。Posix定義了新的介面,便於移植、使用;System V Release 4(簡稱SVR4)也是Unix眾多系統實現之一。
Posix IPC名字標識
用來標識Posix有名訊號量的路徑名,可能是真正路徑名(對應真實檔案路徑),也可能不是。
Posix IPC名字的要求:
- 必須符合已有路徑名規則(最多由PATH_MAX 位元組構成,包括末尾null byte);
- 如果以"/"開頭,那麼這些函式的不同調用將訪問同一個佇列。如果不是,那麼效果取決於實現;
- 名字中額外的"/"的解釋由實現定義;
為了便於移植,使用"/"開頭的Posix IPC名字,通常選擇"/tmp/"作為路徑目錄,對應檔名不包含任何"/"。當然,程式必須對該目錄具有讀寫許可權。而對於有的系統沒有"/tmp/"的情況,可以通過建立自定義函式px_ipc_name()(參見px_ipc_name.c),來解決這個問題。
二值訊號量
二值訊號量(binary semaphore):指值為0或1的訊號量。值為1,代表資源可用(未被佔用);值為0,代表資源不可用(已被佔用)。
下面2個圖分別展示了由2個程序使用的System V二值訊號量,Posix有名二值訊號量。
System V訊號量的維護是在核心中,Posix訊號量的維護則不一定。因此,第一幅圖更貼切System V二值訊號量。
Posix有名訊號量可能與檔案系統中的路徑名對應的名字來標識的,實際實現時,路徑可能只是起到一個標識作用,訊號量的值(如,0或1)不一定存放在該檔案中,可能存放在核心的某處。
計數訊號量
計數訊號量(counting semaphore):指值為0和某個限制值(>= 32767 for Posix訊號量)之間的訊號量。訊號量的值,代表著資源的可用數量。
訊號量上的操作
一個程序可以在某個訊號量上執行3種操作:
-
建立(create):建立一個訊號量,要求呼叫者指定訊號量初值。如,二值訊號量初值為0或1;
-
等待(wait):操作會測試該訊號量的值,如果其值 <= 0,那麼就阻塞等待,等到值 >= 1時,就將它-1。這個操作最初稱為P操作,也稱為遞減,或上鎖(lock)。
-
掛出(post):操作會將訊號量的值+1。這個操作最初稱為V操作,也成為遞增,解鎖或發訊號(signal)。
二值訊號量與互斥鎖
二值訊號量可用於程序/執行緒互斥,類似於互斥鎖。下面給出解決互斥問題的互斥鎖和訊號量:
初始化互斥鎖mutex; 初始化訊號量sem為1;
pthread_mutex_lock(&mutex); sem_wait(&sem); // 等待
臨界區 臨界區
pthread_mutex_unlock(&mutex); sem_post(&sem); // 掛出
訊號量、互斥鎖、條件變數的區別
-
互斥鎖必須總是由給它上鎖的執行緒解鎖,訊號量的掛出卻不必由執行過它等待操作的同一執行緒執行。
-
互斥鎖要麼被鎖住,要麼被解開(二值狀態,相當於二值訊號量)。
-
訊號量有一個與之關聯的狀態(訊號量的計數值),訊號量的掛出操作總是被記住。然而,當向一個條件變數傳送訊號時,如果沒有執行緒等待在該條件變數上,那麼該訊號將丟失。
這是什麼意思?
比如,對於訊號量,有個訊號量sem_t s = 0, 執行緒P1: sem_wait(&s) ,表明P1在等待訊號量s。執行緒P2: sem_post(&s),表明P2掛出訊號量s。不論P1是否在等待s,P2都可以將s訊號量值+1。
對於條件變數,如果沒有任何執行緒等待條件變數,執行緒P2傳送signal訊號也不會喚醒任何執行緒,訊號將丟失。
既然互斥鎖和條件變數也能實現執行緒/程序間的同步,為何還提供訊號量?
Posix.1基本原理一文提到,
提供訊號量的主要目的是提供一種程序間同步方式。這些程序可能共享也可能不共享(某個)記憶體區。這兩者都是已經廣泛使用多年的同步正規化。每組原語都特別適合特定的問題。
也就是說,訊號量的意圖在於程序間同步,互斥鎖和條件變數的意圖在於執行緒間的同步。不過,它們都可以應用在程序間同步、執行緒間同步。應該使用適合具體應用的那組原語。
有名訊號量與無名訊號量
下圖比較了這兩類訊號量使用的函式
有名訊號量使用函式
sem_open、sem_close、sem_unlink函式
sem_open
sem_open 建立一個新的有名訊號量,或開啟一個已存在的有名訊號量。有名訊號量既可用於執行緒間同步,又可用於程序間的同步。
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag, ... /* mode_t mode, unsigned int value */);
引數
- name Posix IPC名稱標識。
- oflag 可以是0、O_CREAT或O_CREAT|O_EXCL。如果指定了O_CREAT,那麼第三個引數(mode)、第四個引數(value)是需要的。
- mode 引數指定許可權位。
- value 引數指定訊號量的初始值。但該初值 <= SEM_VALUE_MAX( >= 32767)。二值訊號量的初值通常為1,計數訊號量的初值 > 1。
返回值
成功時,返回一個指向sem_t型別的指標。該指標隨後用作sem_close、sem_wait、sem_trywait、sem_post及sem_getvalue的引數。
失敗時,返回NULL。Posix早期草案指定使用SEM_FAILED(-1),即#define SEM_FAILED ((sem_t *)(-1))
。
注意:當開啟某個有名訊號量時,oflag引數並沒有指定O_RDONLY、O_WRONLY或O_RDWR標誌,但都要求對某個已存在的訊號量具有讀訪問和寫訪問許可權,這樣對其呼叫sem_open才能成功。
sem_close
sem_close 關閉一個有名訊號量。
#include <semaphore.h>
int sem_close(sem_t *sem);
一個程序終止時,核心對其海開啟著的所有有名訊號量自動執行關閉動作,而不論是程序自願終止的(呼叫exit, _exit, main函式return),還是非自願終止(收到Unix終止訊號)。
sem_unlink
關閉一個有名訊號量並沒有將它從系統刪除。即使當前沒有堅持打卡某個訊號量,其值仍然保持著。
使用sem_unlink將有名訊號量從系統中刪除:
#include <semaphore.h>
int sem_unlink(sem_t *sem);
每個訊號量都有一個引用計數器記錄當前的開啟次數(就像檔案一樣),sem_unlink類似於檔案I/O的unlink函式:當引用計數 > 0時,name就能從檔案系統中刪除,然而其訊號量的析構(不同於將它的名字從檔案系統中刪除)要等到最後一個sem_close發生為止。
sem_wait、sem_trywait
sem_wait 測試所指定訊號量的值,如果該值 > 0,那就-1並立即返回。 如果該值 == 0,呼叫執行緒就被置於休眠狀態中,直到該值變為 > 0時,再-1,函式隨後返回。
sem_wait和sem_trywait區別:當指定訊號量值為0時,前者會將呼叫執行緒置於休眠狀態,後者不會讓執行緒休眠,而是直接返回EAGAIN錯誤。
如果被某個訊號中斷,sem_wait可能過早返回,返回的錯誤為EINTR。
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
sem_post、sem_getvalue
sem_post
當一個執行緒使用完訊號量時,應該呼叫sem_post掛出訊號量。sem_post函式將訊號量值+1,然後喚醒正在等待該訊號量的任意執行緒。
sem_getvalue
當需要讀取訊號量的值時,呼叫sem_getvalue,由valp指向的整數返回訊號量的值。
#include <semaphore.h>
int sem_post(sem_t *sem);
int sem_getvalue(sem_t *sem, int *valp);
成功返回0,失敗返回-1。
無名訊號量使用函式
sem_init, sem_destroy
sem_init 初始化無名訊號量
sem_destroy 銷燬無名訊號量
#include <semaphore.h>
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_destroy(sem_t *sem);
引數
- sem 指向基於記憶體訊號量變數;
- shared 可選取值PTHREAD_PROCESS_PRIVATE(0,訊號量用於執行緒間的同步),PTHREAD_PROCESS_SHARED(訊號量用於程序間同步);
- value 訊號量初值;
返回值
成功,返回0;失敗,返回-1.
sem_open 與sem_init主要區別:
- sem_open 需要Posix IPC名稱標識引數,但不需要類似於shared的引數或PTHREAD_PROCESS_SHARED的屬性,因為有名訊號量總是可以在不同程序間共享。
- 對於給定訊號量,sem_init只能初始化訊號量一次,多次初始化結果未定義。而sem_open可以對同一訊號量呼叫多次。
當不需要使用與有名訊號量關聯的名字時,可改用基於記憶體的訊號量(無名訊號量);彼此無親緣關係的不同程序需要使用訊號量時,通常使用有名訊號量。名字就是各個程序標識訊號量的手段。
生產者-消費者問題
生產者-消費者問題是執行緒、程序同步中的一個經典問題,也稱為有界緩衝區(bounded buffer)問題。
一個或多個生產者(執行緒或程序)建立著一個個的資料條目,然後這些條目由一個或多個消費者(執行緒或程序)處理。資料條目在生產者和消費者之間是使用某種型別的IPC傳遞的。
Unix管道就是這樣的問題:一端寫入內容,另一端取出並處理內容。
最簡單的生產者-消費者模型:
處理這個模型問題的虛擬碼:
生產者 消費者
訊號量get初始化為0;
訊號量put初始化為1;
for (; ;) { for (; ;) {
sem_wait(&put); sem_wait(&get);
把資料放入緩衝區 取出並處理緩衝區中的資料
sem_post(&get); sem_post(&put);
} }
單個生產者-單個消費者問題
將共享緩衝區用作環形緩衝區,生產者填寫最後一項buff[NBUFF-1]後,回到開頭填寫第一項buff[0],消費者也是這個訪問順序。
環形緩衝區3個約束條件:
- 當緩衝區為空時,消費者不能試圖從其中去除一個條目;
- 當緩衝區為滿時,生產者不能試圖往其中放置一個條目;
- 共享變數可能描述緩衝區的當前狀態(下標、計數、連結串列指標等),因此生產者和消費者的所有緩衝區都必須保護起來,以避免競爭狀態;
接下來使用訊號量方案,展現3種不同型別的訊號量:
- 名為mutex的二值訊號量,保護2個臨界區:一個是往共享緩衝區插入資料條目(生產者執行);一個是從共享緩衝區移走資料條目(消費者執行)。用作互斥鎖的二值訊號量初值1(也可以使用互斥鎖pthread_mutex_t替代);
- 名為nempty的計數訊號量,統計共享緩衝區中的空槽位數。初值為緩衝區的槽位數;
- 名為nstored的計數訊號量,統計共享緩衝區中的滿槽位數(已填寫資料條目)。初值0;
下圖展示了初始化後的緩衝區和2個計數訊號量的狀態
過程示例:
生產者把0~(NLOOP-1) 存放到共享環形緩衝區中(i.e. buff[0]=0, buff[1]=1,...);
消費者從該緩衝區取出這些整數,並驗證它們是否正確(buff[i] % NBUFF == i?),如有錯誤,就輸出到標準輸出上。
生產者放置了3個條目到緩衝區,消費者從中取出1個條目後,緩衝區和計數訊號量的狀態,如下圖所示:
main.c 資料結構定義,測試程式
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/fcntl.h>
#include <pthread.h>
#define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)
#define NBUFF 10
#define SEM_MUTEX "mutex"
#define SEM_NEMPTY "nempty"
#define SEM_NSTORED "nstored"
void *produce(void *), *consumer(void *);
int nitems; /* read-only by producer and consumer */
struct {
int buff[NBUFF];
sem_t mutex, nempty, nstored;
} shared;
int main(int argc, char *argv[])
{
pthread_t tid_produce, tid_consumer;
if (argc != 2) {
fprintf(stderr, "Usage: prodcons2 <#items>\n");
exit(1);
}
nitems = atoi(argv[2]);
sem_init(&shared.mutex, 0, 1);
sem_init(&shared.nempty, 0, NBUFF);
sem_init(&shared.nstored, 0, 0);
pthread_create(&tid_produce, NULL, produce, NULL);
pthread_create(&tid_consumer, NULL, consumer, NULL);
pthread_join(tid_produce, NULL);
pthread_join(tid_consumer, NULL);
sem_destroy(&shared.mutex);
sem_destroy(&shared.nempty);
sem_destroy(&shared.nstored);
return 0;
}
main.c 生產者,消費者執行緒
void *produce(void *arg)
{
int i;
for (i = 0; i < nitems; ++i) {
sem_wait(&shared.nempty); /* wait for at least 1 empty slot */
sem_wait(&shared.mutex);
shared.buff[ i % NBUFF] = i; /* store i into circular buffer */
sem_post(&shared.mutex);
sem_post(&shared.nstored); /* 1 more stored item */
}
return NULL;
}
void *consumer(void *arg)
{
int i;
for (i = 0; i < nitems; ++i) {
sem_wait(&shared.nstored); /* wait for at least 1 stored item */
sem_wait(&shared.mutex);
/* check if data stored in buffer is right */
if (&shared.buff[i % NBUFF] != i) {
printf("buff[%d] = %d\n", i, shared.buff[i % NBUFF]);
}
sem_post(&shared.mutex);
sem_post(&shared.nempty);
}
return NULL;
}
參考
UNPv2 第10章~第11章