1. 程式人生 > >Linux IPC 之訊息佇列

Linux IPC 之訊息佇列

認識訊息佇列

訊息佇列提供了從一個程序到另一個程序傳送一個數據塊(整發整收)的能力。

每個資料塊被認為有一個應用型別, 接收者程序接受的資料塊可以有不同的型別。

訊息佇列的每一個傳送和接收的資料塊是有最大位元組限制的(MSGMAX), 整個訊息佇列的大小也是有位元組限制的(MSGMNB), 系統中的訊息佇列數量也有最大限制(MSGMNI)。

與訊息佇列相關的系統呼叫

1.

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>
 int msgget(key_t key, int
msgfg);

key代表要建立的訊息佇列的鍵
msgflag通常是通過IPC_CREAT或者IPC_EXCL來設定的
IPC_CREAT|IPC_EXCL代表建立訊息佇列,如果這個訊息佇列已經存在,則立即出錯返回
IPC_CREAT則代表建立訊息佇列,不存在則建立,存在則直接開啟
如果成功返回非負訊息佇列識別符號。

msgget函式返回的是程序在呼叫發收訊息函式是需要使用的訊息佇列標識。 但是為了使多個程序可以在同一個IPC物件上匯聚。 所以要有IPC物件的外部名key。
ftok() 兩個引數的目的是為了 生成表示唯一一個IP C物件的 的IPC key 兩個引數的二元組要唯一

#include<sys/ipc.h>
key_t ftok(const char* path, int id);

path和id表示通訊的程序共同認同的路徑名和專案id。

2.

 #include <sys/types.h>
 #include <sys/ipc.h>
 #include <sys/msg.h>
 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

msgctl函式對訊息佇列執行多種操作。
msgid是訊息佇列的識別符號。
cmd引數是對msgid佇列要執行的操作

cmd:
IPC_RMID 從系統中刪除該訊息佇列以及訊息佇列中的資料。
IPC_STAT 取出此佇列的msqid_ds 結構 放在buf指向的結構體中.
IPC_SET 在程序許可權允許的情況下將buf指向的結構體中的部分資訊複製到與這個隊也有關的msgid_ds中。

3.
int msgsnd(int msgid, const void* msgp, size_t msgsz, long int msgtyp, int msgflg)

msgid 訊息佇列標識。
msgp指向要發的資料塊(一個包含表示型別的long int 和char text[msgsz], )
msgsz 表示要傳送的資料塊結構體中資料的大小位元組數。
msgtyp 表示所傳送訊息的型別, 它可以實現接收優先順序的簡單形式。
msgflg 用來表示當訊息佇列被寫滿時會發生什麼。

msgflg == IPC_NOWAIT 時 表示佇列滿不等待 返回EAGIN錯誤。
msgflg == MSG_NOERROR 時 大於訊息佇列要求的資料被截斷。
msgtyp ==0 時返回第一個資料 < 0 時返回第一個不大於概數絕對值型別大小的數字, > 0時返回佇列裡第一條等於msgtyp的資料塊(結構體)。

傳送訊息的資料塊(結構體) 需要自定義

        struct msgbuf {
                long mtype;       /* message type, must be > 0 */型別必須大於0
               char mtext[1];    /* message data */塊大小
           };

4.
int msgrcv(int msgid, const void* msgp, size_t msgsz, long int msgtyp, int msgflg);

IPC 物件的資料結構

核心中每一個IPC物件都有一個ipc_prem結構體。

ipc_perm 結構定義於中,原型如下:
struct ipc_perm
{
key_t        key;                        呼叫shmget()時給出的關鍵字
uid_t           uid;                      /*所有者的有效使用者ID */
gid_t          gid;                       /*所有者所屬組的有效組ID*/ 
uid_t          cuid;                    /* 建立 者的有效使用者ID*/
gid_t         cgid;                   /*  建立者所屬組的有效組ID*/
unsigned short   mode;    /* Permissions + SHM_DEST和SHM_LOCKED標誌*/
unsignedshort    seq;          /* 序列號*/
}; 

構構訊息佇列的資料結構

struct msqid_ds
  {
    struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;       /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};

訊息佇列的本質是一個結構體連結串列 發訊息時往連結串列中放一個結構體 收訊息時從裡面依據msgtyp拿一個結構體。

這裡寫圖片描述

訊息佇列的特性

1、生命週期隨核心
2、雙向通訊
3、按塊大小讀寫//不像管道是面向位元組流的
4、核心保證同步與互斥。

程式碼:

server.c
#include"comm.h"
int main()
{
     int msgid=CreateMsgQueue();
     char buf[1024]={0};
    while(1)
     {
          RecvMsg(msgid,CLIENT_TYPE,buf);
          printf("client say :%s\n",buf);
          ssize_t s=read(0,buf,sizeof(buf)-1);
          if(s>0)
          {
               buf[s-1]=0;
               SendMsg(msgid,SERVER_TYPE,buf);
          }
     }
     Destroy(msgid);
     return 0;
}

client.c
#include"comm.h"
int main()
{
     int msgid=GetMsgQueue();
     char buf[1024]={0};
    while(1)
     {   
           ssize_t w = read(0,buf,sizeof(buf)-1);
          if(w>0)
          {
            buf[w-1]=0;
           SendMsg(msgid,CLIENT_TYPE,buf);
     }

          RecvMsg(msgid ,SERVER_TYPE,buf);
          printf("Server say:%s\n",buf);
     }
     return 0;
}

comm.c
#include"comm.h"
int ComMsgQueue(int flag)
{
     key_t key=ftok(PATHNAME,PROJ_ID);
     if(key<0)
     {
          perror("ftok");
          return -1;
     }
     int msgid=msgget(key,0666|flag);
     if(msgid<0)
     {
          perror("msgget");
          return -2;
     }
     return msgid;
}
int CreateMsgQueue()
{
   return ComMsgQueue(IPC_CREAT|IPC_EXCL);
}
int GetMsgQueue()
{
     return ComMsgQueue(IPC_CREAT);
}
int DestroyMsgQueue(int msgid)
{

     if(msgctl(msgid,IPC_RMID,NULL)<0)
     {
          perror("msgctl");
          return -1;
     }
    return 0;
}
int SendMsg(int msgid,int type,const char*msg)
{
   struct msgbuf _mb;
   _mb.mytype=type;
   strcpy(_mb.mtext,msg);
   if(msgsnd(msgid,&_mb,sizeof(_mb.mtext),0)<0)
   {
     perror("msgsnd");
     return -1;
   }
   return  0;
}
int RecvMsg(int msgid,int type,char*out)
{
   struct msgbuf _mb;
   if(msgrcv(msgid,&_mb,sizeof(_mb.mtext),type,0)<0)
   {
        perror("msgrcv");
        return -1;
   }
   strcpy(out,_mb.mtext);
   return 0;
}

comm.h
#ifndef _COMM_H_
#define _COMM_H_
#define SERVER_TYPE 1
#define CLIENT_TYPE 2
#define PATHNAME "."
#define PROJ_ID 0x6666
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include<string.h>
#include<stdio.h>
struct msgbuf
{
     long mytype;
     char mtext[1024];
};
int CreatMsgQueue();
int DestroyMsgQueue();
int SendMsg();
int RecvMsg();
int GetMsgQueue();
#endif