1. 程式人生 > >程序間通訊方式總結——訊息佇列

程序間通訊方式總結——訊息佇列

        Linux/Unix系統IPC是各種程序間通訊方式的統稱,但是其中極少能在所有Linux/Unix系統實現中進行移植。隨著POSIXOpen GroupX/Open)標準化的推進呵護影響的擴大,情況雖已得到改善,但差別仍然存在。一般來說,Linux/Unix常見的程序間通訊方式有:管道、訊息佇列、訊號、訊號量、共享記憶體、套接字等。博主將在《程序間通訊方式總結》系列博文中和大家一起探討學習程序間通訊的方式,並對其進行總結,讓我們共同度過這段學習的美好時光。這裡我們就以其中一種方式訊息佇列展開探討,訊息佇列提供了一種從一個程序向另一個程序傳送一個數據塊的方法。每個資料塊都被認為是一個管道,接收程序可以獨立地接收含有不同管道的資料結構。我們可以通過傳送訊息來避免命名管道的同步和阻塞問題。訊息佇列與命名管道一樣,每個資料塊都有一個最大長度的限制。我們可以將每個資料塊當作是一中訊息型別(頻道),傳送和接收的內容就是這個型別(頻道)對應的訊息(節目),每個型別(頻道)相當於一個獨立的管道,相互之間互不影響。

Linux提供了一系列訊息佇列的函式介面來讓我們方便地使用它來實現程序間的通訊。它的用法與其他兩個System V IPC機制,即訊號量和共享記憶體相似,下面就讓我們一起來學習一下訊息佇列吧。

函式介紹

msgget

函式原型:int msgget(key_t, key, int msgflg); 

       與其他的IPC機制一樣,程式必須提供一個鍵來命名某個特定的訊息佇列。當key=0IPC_PRIVATE)建立一個只能在程序內部通訊的訊息佇列。msgflg是一個許可權標誌,表示訊息佇列的訪問許可權,它與檔案的,訪問許可權一樣。msgflg可以與IPC_CREAT做或操作,表示當key

所命名的訊息佇列不存在時建立一個訊息佇列,如果key所命名的訊息佇列存在時,IPC_CREAT標誌會被忽略,而只返回一個識別符號。IPC_CREAT|IPC_EXCL,如果核心中不存在鍵值與key相等的訊息佇列,則新建一個訊息佇列;如果存在這樣的訊息佇列則報錯。它返回一個以key命名的訊息佇列的識別符號(非零整數),失敗時返回-1,錯誤資訊存於errno中。

錯誤碼:

EACCES:指定的訊息佇列已存在,但呼叫程序沒有許可權訪問它

EEXISTkey指定的訊息佇列已存在,而msgflg中同時指定IPC_CREATIPC_EXCL標誌

ENOENTkey指定的訊息佇列不存在,同時msgflg

中沒有指定IPC_CREAT標誌

ENOMEM:需要建立訊息佇列,但記憶體不足

ENOSPC:需要建立訊息佇列,但已達到系統的限制

msgsend

函式原型:int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg); 

msgidmsgget函式返回的訊息佇列識別符號。

msg_ptr指向準備傳送訊息的結構體指標,但是訊息的資料結構卻有一定的要求,指標msg_ptr所指向的訊息結構一定要是以一個長整型成員變數開始的結構體,接收函式將用這個成員來確定接收的訊息來自哪一個型別(頻道)的訊息(節目),其他欄位不限。訊息結構定義:

struct my_message{  

    long int message_type;  //頻道(你可以理解為訊息通道號)

    /* The data you wish to transfer*/  //訊息主體

};  

msg_szmsg_ptr指向的訊息的長度,注意是訊息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型訊息型別成員變數的長度。

msgflg用於控制當前訊息佇列滿或佇列訊息到達系統範圍的限制時將要發生的事情。

返回值:如果呼叫成功,訊息資料的一分副本將被放到訊息佇列中,並返回0,失敗時返回-1msgflg=0,當訊息佇列滿時,msgsnd將會阻塞,直到訊息能寫進訊息佇列;msgflg=IPC_NOWAIT,當訊息佇列已滿的時候,msgsnd函式不等待立即返回;msgflg=IPC_NOERROR,若傳送的訊息大於size位元組,則把該訊息截斷,截斷部分將被丟棄,且不通知傳送程序。

錯誤程式碼:

EAGAIN:引數msgflg設為IPC_NOWAIT,而訊息佇列已滿

EIDRM:識別符號為msgid的訊息佇列已被刪除

EACCESS:無許可權寫入訊息佇列

EFAULT:引數msgp指向無效的記憶體地址

EINTR:佇列已滿而處於等待情況下被訊號中斷

EINVAL:無效的引數msgidmsgsz或引數訊息型別type小於0

       msgsnd為阻塞函式,當訊息佇列容量滿或訊息個數滿會阻塞。訊息佇列已被刪除,則返回EIDRM錯誤;被訊號中斷返回E_INTR錯誤。如果設定IPC_NOWAIT訊息佇列滿或個數滿時會返回-1,並且置EAGAIN錯誤。msgsnd解除阻塞的條件有以下三個條件:

不滿足訊息佇列滿或個數滿兩個條件,即訊息佇列中有容納該訊息的空間。

 msgid代表的訊息佇列被刪除。

呼叫msgsnd函式的程序被訊號中斷(SIGINT)。

msgrecv

函式原型:int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg);  

msgidmsgget函式返回的訊息佇列識別符號。

msg_ptr指向準備接收訊息的結構體指標,但是訊息的資料結構卻有一定的要求,指標msg_ptr所指向的訊息結構一定要是以一個長整型成員變數開始的結構體,接收函式將用這個成員來確定接收的訊息來自哪一個型別(頻道)的訊息(節目)。訊息結構定義:

struct my_message{  

    long int message_type;  //頻道(你可以理解為訊息通道號)

    /* The data you wish to transfer*/  //訊息主體

};  

msg_stmsg_ptr指向的訊息的長度,注意是訊息的長度,而不是整個結構體的長度,也就是說msg_sz是不包括長整型訊息型別成員變數的長度。

msgtypemsgtype可以實現一種簡單的接收優先順序,訊息是按照接收順序存於訊息佇列中,每條訊息對應一個訊息型別號(頻道號)。如果msgtype0,就獲取佇列中的第一條訊息。如果它的值大於零,將獲取具有相同型別號(頻道號)的第一條訊息。如果它小於零,就獲取頻道號等於或小於msgtype的絕對值的第一個訊息。

msgflg用於控制當佇列中沒有相應型別的訊息可以接收時將發生的事情。msgflg=0,阻塞式接收訊息,沒有該型別的訊息msgrcv函式一直阻塞等待;msgflg=IPC_NOWAIT,如果沒有返回條件的訊息呼叫立即返回,此時錯誤碼為ENOMSGmsgflg=IPC_EXCEPT,與msgtype配合使用返回佇列中第一個型別不為msgtype的訊息;msgflg=IPC_NOERROR,如果佇列中滿足條件的訊息內容大於所請求的size位元組,則把該訊息截斷,截斷部分將被丟棄。

返回值:呼叫成功時,該函式返回放到接收快取區中的位元組數,訊息被複制到由msg_ptr指向的使用者分配的快取區中,然後刪除訊息佇列中的對應訊息。失敗時返回-1

錯誤程式碼:

E2BIG:訊息資料長度大於msgszmsgflag沒有設定IPC_NOERROR

EIDRM:識別符號為msgid的訊息佇列已被刪除

EACCESS:無許可權讀取該訊息佇列

EFAULT:引數msgp指向無效的記憶體地址

ENOMSG:引數msgflg設為IPC_NOWAIT,而訊息佇列中無訊息可讀

EINTR:等待讀取佇列內的訊息情況下被訊號中斷

msgrcv解除阻塞的條件有以下三個:

①訊息佇列中有了滿足條件的訊息。

msgid代表的訊息佇列被刪除。

③呼叫msgrcv()的程序被訊號中斷(SIGINT)。

msgctl

函式原型:int msgctl(int msgid, int command, struct msgid_ds *buf); 

msgidmsgget函式返回的訊息佇列識別符號。

command是將要採取的動作,它可以取3個值:

IPC_STAT:把msgid_ds結構中的資料設定為訊息佇列的當前關聯值,即用訊息佇列的當前關聯值覆蓋msgid_ds的值。

IPC_SET:如果程序有足夠的許可權,就把訊息列隊的當前關聯值設定為msgid_ds結構中給出的值

IPC_RMID:刪除訊息佇列

buf指向msgid_ds結構的指標,它指向訊息佇列模式和訪問許可權的結構。

struct msqid_ds {//Linux系統中的定義

struct ipc_perm msg_perm; /* Ownership andpermissions*/

time_t msg_stime; /* Time of last msgsnd()*/

time_t msg_rtime; /* Time of last msgrcv()*/

time_t msg_ctime; /* Time of last change */

unsigned long __msg_cbytes; /* Currentnumber of bytes inqueue (non-standard) */

msgqnum_t msg_qnum; /* Current number of messagesinqueue */

msglen_t msg_qbytes; /* Maximum number ofbytesallowed in queue */

pid_t msg_lspid; /* PID of last msgsnd() */

pid_t msg_lrpid; /* PID of last msgrcv() */

};//不同的系統中此結構會有不同的新成員

返回值:成功時返回0,失敗時返回-1

錯誤程式碼:

EACCESS:引數cmdIPC_STAT,無許可權讀取該訊息佇列

EFAULT:引數buf指向無效的記憶體地址

EIDRM:識別符號為msgid的訊息佇列已被刪除

EINVAL:無效的引數cmdmsgid

EPERM:引數cmdIPC_SETIPC_RMID,卻無足夠的許可權執行

程式例項

例項一

在程式例項一中,傳送程序建立訊息佇列,接收程序連線訊息佇列。傳送程序向指定的頻道號(類別號)傳送指定訊息,接收程序接收指定頻道中的訊息。如果接收程序指定的頻道號為0,表示接收訊息佇列中任意頻道號對應的訊息,按照訊息的先後順序接收。

msgsend.c

#include<stdio.h>

#include<stdlib.h>

#include<string.h>

#include<unistd.h>

#include<sys/msg.h>

#include<sys/ipc.h>

//定義訊息結構體

typedefstruct msgbuf

{

long channel;//訊息結構體的第一個元素必須是long型別(訊息類別號)

char buf[256];//儲存訊息(訊息體)

}msg_t;

intmain(int argc, char **argv)

{

if (argc < 2) fprintf(stderr,"usage:%s msgid\n", argv[0]), exit(1);

//建立訊息佇列int msgget(key_t key, int msgflg);

//引數1:訊息佇列的key(標識一個訊息佇列)

//引數2:如果key對應的訊息佇列不存在則建立之,如果存在則開啟

int msgid = msgget(atoi(argv[1]), IPC_CREAT| IPC_EXCL | 0666);

if (-1 == msgid)perror("msgget"), exit(1);

//定義並初始化訊息結構體

msg_t msg;

memset(&msg, 0x00, sizeof(msg));

while (1)

{

//輸入通道號(訊息類別)

printf("channel:");

scanf("%d",&msg.channel);

scanf("%*c");//讀取緩衝區中的換行符

//從標準輸入獲取資料,以回車結束

printf("msg:");

fgets(msg.buf, sizeof(msg.buf), stdin);

//scanf("%*c");//由於fgets會接收\n,不用再接收換行符

//int msgsnd(int msqid, const void*msgp, size_t msgsz, int msgflg);

//引數1:呼叫msgget函式返回的訊息佇列號(識別符號)

//引數2:待發送的訊息結構體地址

//引數3:待發送的訊息長度(不包括訊息結構體的第一個元素)

//引數4:附加功能(沒有特殊需求填0即可)

msgsnd(msgid, &msg,sizeof(msg.buf), 0);

char ch = ' ';

fprintf(stdout, "是否繼續傳送?[yn]:");

scanf("%[yYnN]", &ch);

scanf("%*c");

if ('y' != ch && 'Y' != ch)

{

break;

}

}

//int msgctl(int msqid, int cmd, structmsqid_ds *buf);

//引數1:呼叫msgget函式返回的訊息佇列號

//引數2:執行的操作型別

//引數3:訊息佇列狀態資訊結構體指標

msgctl(msgid, IPC_RMID, 0);

return 0;

}

msgrecv.c

#include<stdio.h>

#include<stdlib.h>

#include<string.h>

#include<unistd.h>

#include<sys/msg.h>

#include<sys/ipc.h>

#include<signal.h>

//定義訊息結構體

typedefstruct msgbuf

{

long channel;//訊息結構體的第一個元素必須是long型別

char buf[256];

}msg_t;

intmsgid = -1;

//訊號處理函式

voidhandler(int s)

{

//int msgctl(int msqid, int cmd, structmsqid_ds *buf);

//引數1:呼叫msgget函式返回的訊息佇列號

//引數2:執行的操作型別

//引數3:接收訊息佇列的資訊

msgctl(msgid, IPC_RMID, 0);

fprintf(stdout, "exit\n");

exit(EXIT_SUCCESS);

}

intmain(int argc, char **argv)

{

if (argc < 2) fprintf(stderr,"usage:%s msgid\n", argv[0]), exit(EXIT_FAILURE);

//註冊中斷訊號

signal(SIGINT, handler);

//建立訊息佇列int msgget(key_t key, int msgflg);

//引數1:訊息佇列的key(標識一個訊息佇列)

//引數2:如果key對應的訊息佇列不存在則建立之,如果存在則開啟

msgid = msgget(atoi(argv[1]), 0);

if (-1 == msgid)perror("msgget"), exit(EXIT_FAILURE);

//定義並初始化訊息結構體

msg_t msg;

memset(&msg, 0x00, sizeof(msg));

while (1)

{

//輸入要接收的頻道數(訊息類別編號)

printf("channel:");

scanf("%d",&msg.channel);

//int msgsnd(int msqid, const void*msgp, size_t msgsz, int msgflg);

//引數1:呼叫msgget函式返回的訊息佇列號

//引數2:待接收的訊息結構體地址

//引數3:待接收的訊息長度(不包括訊息結構體的第一個元素)

//引數4:接受的channel

//引數5:附加功能(沒有特殊需求填0即可)

msgrcv(msgid, &msg,sizeof(msg.buf), msg.channel, 0);

fprintf(stdout, "msg: %s\n",msg.buf);

}

return 0;

}

程式執行結果:

 

例項二

在例項程式二中,服務程序建立訊息佇列,客戶程序連線訊息佇列。服務程序接收頻道1中的訊息,所有的客戶程序下向訊息佇列中的頻道1傳送訊息,客戶程序將當前程序的PID(唯一區分客戶程序)作為訊息體的前4位元組,服務程序接收到資料,首先讀取訊息體的前4位元組,將其作為客戶程序接收資料的頻道號,向該頻道傳送接收到的訊息,對應的客戶端就可以接收到自己傳送給服務程序的訊息,接收的訊息和自己傳送的訊息完全一致。

msgsvr.c

#include<stdio.h>

#include<stdlib.h>

#include<string.h>

#include<unistd.h>

#include<sys/msg.h>

#include<sys/ipc.h>

#include<signal.h>

intmsgid = 0;

voidhandler(int s)

{

fprintf(stdout, "exit\n");

msgctl(msgid, IPC_RMID, 0);

exit(1);

}

//定義訊息結構體

typedefstruct msgbuf

{

long channel;//訊息結構體的第一個元素必須是long型別

char buf[256];//儲存訊息

}msg_t;

intmain(int argc, char **argv)

{

if (argc < 2) fprintf(stderr,"usage:%s msgid\n", argv[0]), exit(1);

signal(SIGINT, handler);

//建立訊息佇列int msgget(key_t key, int msgflg);

//引數1:訊息佇列的key(標識一個訊息佇列)

//引數2:如果key對應的訊息佇列不存在則建立之,如果存在則開啟之。(第二個引數可以直接填0,核心會替你完成相應的參作)

msgid = msgget(atoi(argv[1]), IPC_CREAT |IPC_EXCL | 0666);

if (-1 == msgid)perror("msgget"), exit(1);

msg_t sendmsg;

msg_t rcvemsg;

while (1)

{

memset(&sendmsg, 0x00,sizeof(msg_t));

memset(&rcvemsg, 0x00,sizeof(msg_t));

//接收客戶端傳送過來的訊息

msgrcv(msgid, &rcvemsg,sizeof(rcvemsg.buf), 1, 0);

//從訊息體中讀取客戶程序的頻道號(訊息體的前4位元組)

<