1. 程式人生 > 其它 >UNIX環境高階程式設計 - 訊號量

UNIX環境高階程式設計 - 訊號量

1. 概述

訊號量一個計數器,用於多程序共享資料物件的存取。

P操作:

為了獲得共享資源,程序需要執行下列操作:

  1. 測試控制該資源的訊號量。

  2. 若此訊號量的值為正,則程序可以使用該資源。程序將訊號量值減1,表示它使用了一個資源單位。

  3. 若此訊號量的值為0,則程序進入睡眠狀態,直至訊號量值大於0。若程序被喚醒後,它返回至第1步。

V操作:

當程序不再使用由一個資訊量控制的共享資源時,該訊號量值增1。如果有程序正在睡眠等待此訊號量,則喚醒它們。

為了正確地實現資訊量,訊號量值的測試及減1操作應當是原子操作。為此,訊號量通常是在核心中實現的。

2. 獲取訊號量

當使用XSI訊號量時,首先需要通過呼叫函式 semget

來獲得一個訊號量ID。

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semget(key_t key, int nsems, int flag);
                       返回值:若成功,返回訊號量ID;若出錯,返回-1

nsems是該集合中的訊號量數。如果是建立新集合(一般在伺服器中),則必須指定nsems。如果引用一個現存的集合(一個客戶機),則將nsems指定為0。

3. 訊號量基本操作

semctl 函式包含了多種訊號量操作。

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semctl(int semid, int semum, int cmd, . . . /* union semun arg */);

union semun {
  int     val;            /* value for SETVAL */
  struct  semid_ds *buf;  /* buffer for IPC_STAT & IPC_SET */
  u_short *array;         /* array for GETALL & SETALL */
};

對於除GETALL以外的所有GET命令,semctl函式都返回相應值。其他命令的返回值為0。

巨集定義 作用
IPC_STAT 對此集合取semid_ds結構,並存儲在由arg.buf指向的結構中。
IPC_SET 按arg.buf指向的結構中的值設定集合相關結構體中的相關成員。
IPC_RMID 從系統中刪除該訊號量集合。
GETVAL 返回成員semnum的semval值。
SETVAL 設定成員semnum的semval值。該值由arg.val指定。
GETPID 返回成員semnum的sempid的值。
GETNCNT 返回成員semnum的semncnt值
GETZCNT 返回成員semnum的semzcnt值。
GETALL 返回該集合中的所有的訊號量值。
SETALL 將該集合中的所有的訊號量設定成arg.array指向的陣列中的值。

4. 訊號量PV操作

函式semop自動執行訊號量集合上的運算元組。

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semop(int semid, struct sembuf semoparray[ ], size_t nops);
                         返回值:若成功,返回訊號量ID;若出錯,返回-1
                           
struct sembuf {
  unsigned short sem_num;   /* member # in set (0, 1, …, nsems-1) */
  short sem_op;             /* operation (negative, 0, or positive) */
  short sem_flg;            /* IPC_NOWAIT, SEM_UNDO */
};

引數nops規定該陣列中操作的數量(元素數)。

對集合中的每個成員的操作由相應的sem_op值規定。此值可以是負值、0或者是正值。

  1. 若sem_op為正值,則對應於程序釋放佔用的資源數。
  2. 若sem_op為負值,則表示要獲得由該訊號量控制的資源。

5. 包裝(warp function)

對相關api包裝一下可以更加通用:

bool sem_get(const key_t key, int *sem_id) {
    *sem_id = semget((key_t)key, 1, 0666 | IPC_CREAT);
    if (*sem_id == -1) {
        printf("sem_get fail!\n");
        return false;
    }
    return true;
}

bool sem_init(const int sem_id, const int val) {
    if (semctl(sem_id, 0, SETVAL, val) == -1) {
        printf("sem_init fail!\n");
        return false;
    }
    return true;
}

bool sem_unlink(const int sem_id) {
    if (semctl(sem_id, 0, IPC_RMID, sem_id) == -1) {
        printf("sem_unlink fail!\n");
        return false;
    }
    return true;
}

bool sem_wait(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = -1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_wait fail! %d\n", errno);
        return false;
    }
    return true;
}

bool sem_post(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = +1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_post fail! %d\n", errno);
        return false;
    }
    return true;
}

6. 例子:用訊號量解生產者-消費者模型

生產者-消費者問題可以用訊號量如下解決:

例子使用上述的warp函式,程式碼如下:

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/wait.h>       
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#define NUMBER  50000
#define CHILD   5
#define BUFSIZE 10

bool sem_get(const key_t key, int* sem_id);
bool sem_init(const int sem_id, const int val);
bool sem_unlink(const int sem_id);
bool sem_wait(const int semid); // P
bool sem_post(const int semid); // V

#define IS_ERROR(ret) \
    do {              \
        if (!ret) {   \
            exit(0);  \
        }             \
    } while(0);       \

// critical resource
int fd;

// semaphore
int empty = -1;
int full = -1;
int mutex = -1;

void comsumer() {
    int buf_out = 0;
    int data = 0;
    int cnt = 0;
    for (int k = 0; k < NUMBER / CHILD; k++) {
        sem_wait(full);
        sem_wait(mutex);
        
        // fetch buf_out
        lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
        read(fd, (char*)&buf_out, sizeof(int));

        cnt++;
        lseek(fd, sizeof(int) * buf_out, SEEK_SET);
        read(fd, (char*)&data, sizeof(int));
        printf("%d comsume %d %d\n", getpid(), data, cnt);
        fflush(stdout);
        
        // write back
        buf_out = (buf_out + 1) % BUFSIZE;
        lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
        write(fd, (char *)&buf_out, sizeof(int));

        sem_post(mutex);
        sem_post(empty);
    }
    printf("%d total consume %d\n", getpid(), cnt);
}

void producer() {
    int buf_in = 0;
    for (int i = 0 ; i < NUMBER; i++) {
        sem_wait(empty);
        sem_wait(mutex);
        
        lseek(fd, buf_in * sizeof(int), SEEK_SET);
        write(fd, (char*)&i, sizeof(int));
        buf_in = (buf_in + 1) % BUFSIZE;
        printf("produce %d\n", i);
        fflush(stdout);

        sem_post(mutex);
        sem_post(full);
    }
}

int main() {
    int ret = sem_get(123, &empty);
    IS_ERROR(ret);
    ret = sem_get(234, &full);
    IS_ERROR(ret);
    ret = sem_get(345, &mutex);
    IS_ERROR(ret);

    ret = sem_init(empty, BUFSIZE);
    IS_ERROR(ret);
    ret = sem_init(full, 0);
    IS_ERROR(ret);
    ret = sem_init(mutex, 1);
    IS_ERROR(ret);
    
    int out_index = 0;
    fd = open("buffer.dat", O_CREAT | O_RDWR | O_TRUNC, 0666);
    lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
    write(fd, (char *)&(out_index), sizeof(int));

    pid_t p;
    // create producer
    if((p = fork()) == 0) {
        producer();
        return 0;
    } else if(p < 0){
        printf("Fail to fork!\n");
        return -1;
    }

    // create comsumer
    for(int j = 0; j < CHILD ; j++)
    {
        if((p = fork()) == 0) {
            comsumer();
            return 0;
        } else if(p < 0) {
            printf("Fail to fork!\n");
            return -1;
        }
    }
    int cnt = 0;
    printf("wait children!\n");
    pid_t pid;
    while (pid = waitpid(-1, NULL, 0)) {
        if (errno == ECHILD) {
            break;
        }
        cnt ++;
        printf("pid: %d end | sum: %d\n", pid, cnt);
    }

    ret = sem_unlink(empty);
    IS_ERROR(ret);
    ret = sem_unlink(full);
    IS_ERROR(ret);
    ret = sem_unlink(mutex);
    IS_ERROR(ret);

    return 0;
}

bool sem_get(const key_t key, int *sem_id) {
    *sem_id = semget((key_t)key, 1, 0666 | IPC_CREAT);
    if (*sem_id == -1) {
        printf("sem_get fail!\n");
        return false;
    }
    return true;
}

bool sem_init(const int sem_id, const int val) {
    if (semctl(sem_id, 0, SETVAL, val) == -1) {
        printf("sem_init fail!\n");
        return false;
    }
    return true;
}

bool sem_unlink(const int sem_id) {
    if (semctl(sem_id, 0, IPC_RMID, sem_id) == -1) {
        printf("sem_unlink fail!\n");
        return false;
    }
    return true;
}

bool sem_wait(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = -1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_wait fail! %d\n", errno);
        return false;
    }
    return true;
}

bool sem_post(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = +1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_post fail! %d\n", errno);
        return false;
    }
    return true;
}