【Linux】程序間通訊之訊息佇列、訊號量和共享儲存
訊息佇列、訊號量和共享儲存是IPC(程序間通訊)的三種形式,它們功能不同,但有相似之處,下面先介紹它們的相似點,然後再逐一說明。
1、相似點
每個核心中的IPC結構(訊息佇列、訊號量和共享儲存)都用一個非負整數的識別符號加以引用,與檔案描述符不同,當一個IPC結構被建立,以後又被刪除時,與這種結構相關的識別符號連續加1,直至達到一個整型數的最大正直,然後又迴轉到0。識別符號是IPC物件的內部名,還有一個外部名稱為鍵,資料型別是key_t,通常在標頭檔案
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
訊息佇列、訊號量和共享儲存都有自己的get函式,msgget、semget和shmget,用於建立IPC物件,它們都設定了自己的ipc_perm結構,在標頭檔案
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
訊息佇列、訊號量和共享儲存都有自己的內建限制,這些限制的大多數可以通過重新配置核心而加以更改,如sysctl命令,可以配置執行時核心引數,在Linux(Ubuntu)上,執行命令“ipcs -l”可檢視相關限制,如下:
$ ipcs -l
------ Messages Limits --------
max queues system wide = 32000
max size of message (bytes) = 8192
default max size of queue (bytes) = 16384
------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509465599
max total shared memory (kbytes) = 18014398442373116
min seg size (bytes) = 1
------ Semaphore Limits --------
max number of arrays = 32000
max semaphores per array = 32000
max semaphores system wide = 1024000000
max ops per semop call = 500
semaphore max value = 32767
需要注意的是,IPC物件是在系統範圍內起作用的,沒有訪問計數,不同於普通檔案。例如,如果程序建立了一個訊息佇列,在該佇列中放入了幾則訊息,然後終止,但是該訊息佇列及其內容並不會被刪除,它們餘留在系統中直至出現下述情況:由某個程序呼叫msgrcv讀訊息或msgctl刪除訊息佇列;或某個程序執行ipcrm命令刪除訊息佇列;或由正在再啟動的系統刪除訊息佇列。將此與管道相比,當最後一個訪問管道的程序被終止時,管道就被完全地刪除了。對於FIFO而言,雖然當最後一個引用FIFO的程序終止時其名字仍保留在系統中,直至顯式地刪除它,但是留在FIFO中的資料卻在此時被全部刪除。
2、訊息佇列
訊息佇列即message queue,存放在核心中並由訊息佇列識別符號標識,涉及如下函式和資料結構。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
msgget用於建立一個新佇列或開啟一個現存的佇列,引數key可自定義或通過ftok生成,或者使用IPC_PRIVATE,需要保證的是key值沒有與現存的佇列相關聯,msgflag為O_CREAT時建立新佇列,排它性使用O_EXCL。msgsnd將新訊息新增到佇列尾端,引數msqid為訊息佇列id,msgp比較特殊,需要包括兩部分內容,訊息型別和實際的訊息資料,如上面的msgbuf結構,msgsz指定訊息長度,msgflag可以設定為IPC_NOWAIT,表示非阻塞。msgrcv用於從佇列中取訊息,引數msgsz表示緩衝區長度,當訊息長度大於msgsz時,若msgflg設定了MSG_NOERROR則截短訊息,否則出錯E2BIG,msgtyp為0時獲取第一個訊息,大於0時獲取型別為msgtyp的第一個訊息,小於0時獲取型別小於等於msgtyp的型別值最小的第一個訊息。每個訊息佇列都有一個msqid_dt結構與其相關聯,規定了佇列的當前狀態,msgctl則可以對訊息佇列的這種結構進行操作,引數cmd可以是IPC_STAT、IPC_SET、IPC_RMID,分別表示獲取狀態、設定狀態、移除訊息佇列。
下面例子說明訊息佇列的用法,msgsnd.c傳送訊息,msgrcv.c接收訊息,當輸入“quit”時結束。
// msgsnd.c
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>
#include <unistd.h>
struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};
int main(void)
{
struct msg_st data;
data.msg_type = 1;
char buf[BUFSIZ];
key_t akey = 1000;
int msgid = -1;
bool running = true;
// create message queue
msgid = msgget(akey, 0666 | IPC_CREAT);
if (-1 == msgid) {
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
// loop for sending data to message queue
while (running) {
printf("Input text: ");
fgets(buf, BUFSIZ, stdin);
strcpy(data.text, buf);
// send data
if (-1 == msgsnd(msgid, (void*)&data, BUFSIZ, 0))
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
// input "quit" to finish
if(0 == strncmp(buf, "quit", 4)) {
running = false;
}
sleep(1);
}
exit(EXIT_SUCCESS);
}
// msgrcv.c
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>
#include <sys/msg.h>
#include <unistd.h>
struct msg_st
{
long int msg_type;
char text[BUFSIZ];
};
int main(void)
{
struct msg_st data;
data.msg_type = 0;
key_t akey = 1000;
int msgid = -1;
bool running = true;
// create messge queue
msgid = msgget(akey, 0666 | IPC_CREAT);
if (-1 == msgid) {
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
// loop for getting data from message queue
while (running) {
// receive data
if(-1 == msgrcv(msgid, (void*)&data, BUFSIZ, data.msg_type, 0))
{
fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
exit(EXIT_FAILURE);
}
printf("Receive text: %s\n",data.text);
// receive "quit" to finish
if(0 == strncmp(data.text, "quit", 4)) {
running = false;
}
}
// delete message queue
if (-1 == msgctl(msgid, IPC_RMID, 0))
{
fprintf(stderr, "msgctl(IPC_RMID) failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
3、訊號量
訊號量semaphore確切的說是一種同步方式,涉及如下函式和資料結構。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int semget(key_t key, int nsems, int semflg);
int semctl(int semid, int semnum, int cmd, ...);
int semop(int semid, struct sembuf *sops, unsigned nsops);
struct semid_ds {
struct ipc_perm sem_perm; /* Ownership and permissions */
time_t sem_otime; /* Last semop time */
time_t sem_ctime; /* Last change time */
unsigned long sem_nsems; /* No. of semaphores in set */
};
union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
struct sembuf
{
unsigned short int sem_num; /* semaphore number */
short int sem_op; /* semaphore operation */
short int sem_flg; /* operation flag */
};
struct
{
unsigned short semval; /* semaphore value */
unsigned short semzcnt; /* # waiting for zero */
unsigned short semncnt; /* # waiting for increase */
pid_t sempid; /* ID of process that did last op */
}
訊號量是一個計數器,用於多程序對共享資料物件的訪問。為了獲得共享資源,程序需要執行下列操作:
(1)測試控制該資源的訊號量。
(2)若此訊號量的值為正,則程序可以使用該資源,程序將訊號量值減1,表示它使用了一個資源單位。
(3)若此訊號量的值為0,則程序進入休眠狀態,直至訊號量值大於0,程序被喚醒後,它返回第(1)步。
當程序不再使用由一個訊號量控制的共享資源時,該訊號量值增1,如果有程序正在休眠等待此訊號量,則喚醒它們。為了正確地實現訊號量,訊號量值的測試及減1操作應當是原子操作,為此,訊號量通常是在核心中實現的。常用的訊號量形式被成為二元訊號量或雙態訊號量,它控制單個資源,初始值為1,但是一般而言,訊號量的初值可以是任一正值,該值說明有多少個共享資源單位可供共享使用。需要注意的是,訊號量並非是單個非負值,為一個包含了一個或多個訊號量值的訊號量集,建立訊號量需要指定訊號量集中的訊號個數。
semget用於獲取訊號量集識別符號,引數nsems表示訊號量個數,建立新的訊號量集時必須大於0,獲取已有的則為0。semctl對訊號量進行操作,第四個引數可選,型別為semun聯合,semnum指定訊號量集中的某個訊號,cmd同訊息佇列的msgctl一樣也可以是IPC_STAT、IPC_SET、IPC_RMID,還有形如GETXXX的值。semop函式是個原子操作,自動執行訊號量集合上的運算元組sops,sops為sembuf結構體,成員sem_op可以為0、正數、負數,sem_flg為IPC_NOWAIT或SEM_UNDO,後者表示程序終止時自動處理還未處理的訊號量,引數nsops規定該陣列中操作的數量。
先來看一個不使用訊號量的例子:
// semaphore2.c
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
char message = 'X';
int i = 0;
if (argc > 1) {
message = argv[1][0];
}
for (i = 0; i < 10; ++i) {
printf("%c", message);
fflush(stdout);
sleep(rand() % 3);
printf("%c", message);
fflush(stdout);
sleep(rand() % 3);
}
sleep(10);
printf("\n%d - finished\n", getpid());
exit(EXIT_SUCCESS);
}
編譯執行:
$gcc -o sem semaphore2.c
$./sem A & ./sem
[1] 5647
AXAXAXAXXAAXAXAXAAXXAXXAXAAXAXAXAXAXAXXA
5648 - finished
5647 - finished
[1]+ Done ./sem A
一個程序在for迴圈中連續兩次輸出A,並啟動到後臺執行,另一個程序在for迴圈中連續兩次輸出X,從上面的結果可以看出,它們相互競爭,結果是亂序的,並不是兩個連續的A或者X,下面用訊號量改寫上面的例子:
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/sem.h>
union semun
{
int val;
struct semid_ds *buf;
unsigned short *arry;
};
static int sem_id = 0;
int main(int argc, char *argv[])
{
key_t akey = 1000;
char message = 'X';
int i = 0;
// create semaphore
sem_id = semget(akey, 1, 0666 | IPC_CREAT);
if (-1 == sem_id) {
fprintf(stderr, "Failed to create semaphore\n");
exit(EXIT_FAILURE);
}
if (argc > 1) {
// semaphore initialization, must
union semun sem_union;
sem_union.val = 1;
if (-1 == semctl(sem_id, 0, SETVAL, sem_union)) {
fprintf(stderr, "Failed to initialize semaphore\n");
exit(EXIT_FAILURE);
}
message = argv[1][0];
sleep(1);
}
for (i = 0; i < 10; ++i) {
// go into critical zone
struct sembuf sem_i;
sem_i.sem_num = 0;
sem_i.sem_op = -1;
sem_i.sem_flg = SEM_UNDO;
if (-1 == semop(sem_id, &sem_i, 1))
{
perror("semop in failed\n");
exit(EXIT_FAILURE);
}
printf("%c", message);
fflush(stdout);
sleep(rand() % 3);
printf("%c", message);
fflush(stdout);
// leave critical zone
struct sembuf sem_o;
sem_o.sem_num = 0;
sem_o.sem_op = 1;
sem_o.sem_flg = SEM_UNDO;
if (-1 == semop(sem_id, &sem_o, 1)) {
perror("semop out failed\n");
exit(EXIT_FAILURE);
}
sleep(rand() % 3);
}
sleep(10);
printf("\n%d - finished\n", getpid());
if (argc > 1) {
// delete samaphore
sleep(3);
union semun sem_union;
if (-1 == semctl(sem_id, 0, IPC_RMID, sem_union)) {
fprintf(stderr, "Failed to delete semaphore\n");
}
}
exit(EXIT_SUCCESS);
}
執行結果如下:
XXAAXXAAXXAAXXAAXXAAXXAAXXAAXXAAXXXXAAAA
可見,使用了訊號量,輸出結果符合預期,兩個A或者兩個X連在了一起。
4、共享儲存
共享儲存允許兩個或多個程序共享一給定的儲存區,因為資料不需要在客戶程序和伺服器程序之間複製,所以這是最快的一種IPC。使用共享儲存時需要掌握的唯一竅門是多個程序之間對一給定儲存區的同步訪問,若伺服器程序正在將資料放入共享儲存區,則在它做完這一操作之前,客戶程序不應當去取這些資料,通常,訊號量被用來實現對共享儲存訪問的同步。下面是相關的幾個函式和資料結構:
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
void *shmat(int shmid, const void *shmaddr, int shmflg);
int shmdt(const void *shmaddr);
struct shmid_ds {
struct ipc_perm shm_perm; /* Ownership and permissions */
size_t shm_segsz; /* Size of segment (bytes) */
time_t shm_atime; /* Last attach time */
time_t shm_dtime; /* Last detach time */
time_t shm_ctime; /* Last change time */
pid_t shm_cpid; /* PID of creator */
pid_t shm_lpid; /* PID of last shmat(2)/shmdt(2) */
shmatt_t shm_nattch; /* No. of current attaches */
...
};
shmget用於獲取共享儲存識別符號,引數size為共享儲存區的長度,單位是位元組,實現通常將其向上取為系統頁長的整數倍,若size並非系統頁長的整數倍,那麼最後一頁的餘下部分是不可用的,建立一個新的共享儲存區時size需要大於0,引用已有的共享儲存區則將size設定為0。shmctl可操作共享儲存區,同樣可以是IPC_STAT、IPC_SET、IPC_RMID等。
shmat用於將共享儲存段連線到呼叫程序指定的地址shmaddr上,但一般應指定shmaddr為0,核心會自動選擇合適的地址,shmflg可選SHM_RND即地址取整,SHM_RDONLY只讀,預設讀寫。當對共享儲存段的操作結束時,呼叫shmdt取消當前程序與共享儲存段的連線。
下面是一個使用了shm的例子,程式中fork之後,子程序sleep保證父程序先執行,父程序取得共享儲存區以後寫入“hello world”,子程序同樣也取得了這個共享儲存區,然後訪問同一塊地址,讀到了“hello world”。
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/shm.h>
#define SIZE 1024
#define exit_err(str) do { perror(str); exit(EXIT_FAILURE); } while (0);
#define uint32 unsigned long
int main(void)
{
int shmid;
char *shmptr;
key_t key;
pid_t pid;
if ((pid = fork()) < 0) {
exit_err("fork error");
}
if(0 == pid) {
printf("child process\n");
if ((key = ftok("/dev/null", O_RDWR)) < 0) {
exit_err("ftok error");
}
if ((shmid = shmget(key, SIZE, 0600 | IPC_CREAT)) < 0) {
exit_err("shmget error");
}
if ((shmptr = (char*)shmat(shmid, 0, 0)) == (void*)-1) {
exit_err("shmat error");
}
sleep(1);
printf("child pid is %d, share memory from %lx to %lx, content: %s\n",getpid(), (uint32)shmptr, (uint32)(shmptr + SIZE), shmptr);
sleep(1);
if ((shmctl(shmid, IPC_RMID, 0) < 0)) {
exit_err("shmctl error");
}
exit(EXIT_SUCCESS);
}
else {
printf("parent process\n");
if ((key = ftok("/dev/null", O_RDWR)) < 0) {
exit_err("ftok error");
}
if ((shmid = shmget(key, SIZE, 0600 | IPC_CREAT | IPC_EXCL)) < 0) {
exit_err("shmget error");
}
if((shmptr = (char*)shmat(shmid, 0, 0)) == (void*)-1) {
exit_err("shmat error");
}
memcpy(shmptr, "hello world", sizeof("hello world"));
printf("parent pid is %d, share memory from %lx to %lx, content: %s\n",getpid(),(uint32)shmptr, (uint32)(shmptr + SIZE), shmptr);
}
waitpid(pid, NULL, 0);
exit(EXIT_SUCCESS);
}
執行結果如下:
parent process
parent pid is 7275, share memory from 7fb2ebf4a000 to 7fb2ebf4a400, content: hello world
child process
child pid is 7276, share memory from 7fb2ebf4a000 to 7fb2ebf4a400, content: hello world