1. 程式人生 > >c-linux-IPC-訊息列隊MQ-學習

c-linux-IPC-訊息列隊MQ-學習

###概念### 

   訊息佇列就是一個訊息的連結串列。可以把訊息看作一個記錄,具有特定的格式以及特定的優先順序。對訊息佇列有寫許可權的程序可以向其中按照一定的規則新增新訊息;對訊息佇列有讀許可權的程序則可以從訊息佇列中讀走訊息。

    訊息佇列是一個存放在核心中的訊息連結串列,每個訊息佇列由訊息佇列識別符號標識。與管道不同的是訊息佇列存放在核心中,

只有在核心重啟(即作業系統重啟)或者顯式地刪除一個訊息佇列時,該訊息佇列才會被真正刪除。


###標頭檔案###   

#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h>

###資料結構### 

1.訊息緩衝結構 (傳送、接收訊息都要以這個 資料結構模板 進行)

struct  msgstru{      long  mtype; //大於0 //訊息型別     
char  mtext[2048]; //訊息文字 };

說明:訊息文字可以為任意型別,由使用者根據需要定義,如字元陣列、結構體等;

           訊息佇列中的訊息的大小是受限制的,在核心中存在巨集定義。

$ ipcs -l 

2、msqid_ds -核心資料結構

linux核心中,每個訊息佇列都維護一個結構體msqid_ds ,此結構體儲存著訊息隊列當前的狀態資訊。

struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first; /* first message on queue,unused */
struct msg *msg_last; /* last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};

各個欄位的含義:

msg_perm:是一個ipc_perm(定義在標頭檔案sys/ipc.h)的結構,儲存了訊息佇列的存取許可權,以及佇列的使用者ID、組ID等資訊。

msg_first:指向佇列中的第一條訊息

msg_last:指向佇列中的最後一條訊息

msg_stime:向訊息佇列傳送最後一條資訊的時間

msg_rtime:從訊息佇列取最後一條訊息的時間

msg_ctime:最後一次變更訊息佇列的時間

msg_lcbytes:訊息佇列中所有訊息佔的位元組數

msg_qnum:訊息佇列中的訊息數目

msg_qbytes:訊息佇列的最大位元組數

msg_lspid:向訊息佇列傳送最後一條訊息的程序ID

msg_lrpid:從訊息佇列讀取最後一條訊息的程序ID

3、ipc_perm -核心資料結構

結構體ipc_perm儲存著訊息佇列的一些重要的資訊,比如訊息佇列關聯的鍵值,訊息佇列的使用者ID,組ID等。

定義在sys/ipc.h中。

struct ipc_perm
{
__kernel_key_t key;
__kernel_uid_t uid;
__kernel_gid_t gid;
__kernel_uid_t cuid;
__kernel_gid_t cgid;
__kernel_mode_t mode;
unsigned short seq;
};

幾個主要欄位的含義如下:

key:建立訊息佇列用到的鍵值key

uid:訊息佇列的使用者ID

gid:訊息佇列的組ID

cuid:建立訊息佇列的程序使用者ID

cgid:建立訊息佇列程序組ID


###常用介面函式### 

1. 判斷訊息列隊是否存在  存在返回訊息列隊msqid,否則返回負值

int msqid = msgget(MSGKEY,IPC_EXCL); //通過key鍵MSGKEY(整數)與msqid關聯


2.建立訊息列隊  成功返回訊息列隊msqid,否則返回負值

int msqid = msgget(MSGKEY,IPC_CREAT| 0666); /*建立訊息列隊*/ //許可權讀寫


3.傳送訊息  傳送失敗返回負值

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

//msqid:訊息佇列的標識碼

//msgp:指向訊息緩衝區的指標,此位置用來暫時儲存傳送和接收的訊息,結構由使用者根據訊息模板自行定義

//msgsz:訊息的大小

//msgflg:用來指明核心程式在佇列滿了的情況下所應採取的行動。如果msgflg為常數IPC_NOWAIT,則在msgsnd()執行時若是訊息佇列已滿,則msgsnd()將不會阻塞,而會立即返回-1,並設定錯誤碼(errno)為ENOMSG;如果msgflg為0,則在msgsnd()執行時若是訊息佇列已滿,採取阻塞等待的處理模式。

4.接收訊息

size_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

//msqid:訊息佇列的標識碼

//msgp:指向訊息緩衝區的指標,此位置用來暫時儲存傳送和接收的訊息,結構由使用者根據訊息模板自行定義

//msgsz:訊息的大小

//msgtyp: 0--接收所有訊息型別的訊息  正數--只接收該型別的訊息

// msgflg:用來指明核心程式在佇列為空的情況下所應採取的行動。如果msgflg為常數IPC_NOWAIT,則在執行msgrcv()時發現列隊為空,不做等待馬上返回-1,並設定錯誤碼為ENOMSG;當msgflg為0時,採取阻塞等待的處理模式。

5.刪除訊息列隊

msgctl(msqid,IPC_RMID, 0);

###測試例程###

/*MQ snd.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

#define MSGKEY 201844  //2018/04/04

struct msgstru{
    long msgtype;
    char msgtext[2048];
};

int main(){
    struct msgstru msg;
    int mtype;
    char str[256];
    int ret;
    int mqid;

    mqid = msgget(MSGKEY,IPC_EXCL);  /*檢查訊息佇列是否存在*/
    if(mqid < 0){
        mqid = msgget(MSGKEY,IPC_CREAT|0666);  /*建立訊息列隊*/
        if(mqid < 0){
            printf("create mq failed: errno=%d,%s \n",errno,strerror(errno));
            exit(-1);
        }
    } 

    while(1){
        printf("input msgtype:[終止程式輸入0]");
        scanf("%d",&mtype);
        if(mtype == 0){
            break;
        }
        printf("input msgtext:");
        scanf("%s",str);
        msg.msgtype=mtype;
        strcpy(msg.msgtext,str);

        //傳送訊息  //列隊滿時不阻塞,返回-1
        ret=msgsnd(mqid,&msg,sizeof(struct msgstru),IPC_NOWAIT);
        if(ret < 0){
            printf("msgsnd() write msg failed,errno=%d,%s \n",errno,strerror(errno));
            exit(-1);  
        }
    }

    msgctl(mqid,IPC_RMID,0);   //刪除訊息列隊

    return 0;
}
/*MQ mqrcv.c*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>

#define MSGKEY 201844

struct msgstru{
    long msgtype;
    char msgtext[2048];
};

void childproc(){
    struct msgstru msg;
    int mqid;
    int ret;
    char str[512];

    while(1){
        mqid = msgget(MSGKEY,IPC_EXCL);
        if(mqid < 0){
            printf("msq not existed! errno=%d [%s]\n",errno,strerror(errno)); 
            sleep(3);
            continue;
        } 

        //ret=msgrcv(mqid,&msg,sizeof(struct msgstru),0,0);  //列隊為空 阻塞
        //只接收訊息型別為1的訊息
        ret=msgrcv(mqid,&msg,sizeof(struct msgstru),1,0);  //列隊為空 阻塞
        printf("pid=[%d]  mtype=[%ld]  mtext=[%s] \n",getpid(),msg.msgtype,msg.msgtext);
    }

    exit(0);
}

int main(){
    int i,cpid;
    cpid=0;  //區域性變數初始化

    //建立5個子程序 監聽訊息列隊
    for(i=0;i<5;i++){
        cpid=fork();
        if(cpid < 0){  //主程序
            printf("fork failed! \n");
        }else if(cpid == 0){  //子程序走這
            childproc();
        }
    }

    return 0;
}

####參考####

https://www.cnblogs.com/lpshou/archive/2013/06/20/3145651.html

https://www.cnblogs.com/zhangxuan/p/6724068.html