1. 程式人生 > 其它 >linux系統程序間通訊方式(二):訊息佇列

linux系統程序間通訊方式(二):訊息佇列

簡介

本文章繼續介紹linux程序間通訊的方式:訊息佇列。
訊息佇列也是system-V的IPC物件,它也是存在於核心中,有自己的ID;並且通過一個唯一的key來繫結它。

linux提供了一些api讓我們使用這個IPC。下面通過一個示例來演示兩個程序是如何通過訊息佇列來進行通訊的。

1.設計意圖

使用訊息佇列實現兩個程序通訊,一個程序往佇列寫資料,另一個從佇列讀取資料。

2.設計思路

1、首先有兩個程式:p1.c、p2.c;

2、這兩個程式先後執行之後系統會建立兩個程序(下面統稱為p1、p2);

3、p1首先申請一個訊息佇列,並建立兩個訊號量

4、p2開啟訊息佇列、開啟訊號量

5、p1從鍵盤獲取字串,封裝好一個訊息,然後傳送訊息到佇列

6、p2從佇列讀取訊息,並打印出訊息正文

7、通訊過程中使用posix有名訊號量實現同步(即p1寫完佇列p2才能讀取佇列)

程式碼實現

p1.c

#include <stdio.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>


#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>


struct msgbuf 
{
	long mtype;       /* message type, must be > 0 */
	char mtext[100];    /* message data */
};


int main(void)
{
	int msqid;	//儲存訊息佇列的id(識別符號)
	sem_t *sem1 = NULL;	//儲存訊號量地址
	sem_t *sem2 = NULL;	//儲存訊號量地址

	//首先申請一個訊息佇列
	msqid = msgget((key_t)123, IPC_CREAT|0666);	
	if(msqid < 0)
	{
		perror("creat a msg queue fail");
		return -1;
	}

	//建立或開啟一個POSIX有名訊號量
	sem1 = sem_open("sem1", O_CREAT|O_RDWR, 0666, 1);
	if(sem1 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	//建立或開啟一個POSIX有名訊號量
	sem2 = sem_open("sem2", O_CREAT|O_RDWR, 0666, 0);
	if(sem2 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	struct msgbuf msg_to_send;
	msg_to_send.mtype = 1;
	
	while(1)
	{
		//等待訊號量有效,P操作
		sem_wait(sem1);

		if(fgets(msg_to_send.mtext, 11, stdin) != msg_to_send.mtext)
		{
			perror("get char from stdin fail");
			return -2;
		}
		
		if(msgsnd(msqid, &msg_to_send, 100, 0) < 0)
		{
			perror("send msg to queue fail");
			return -3;
		}
		
		//釋放訊號量,V操作
		sem_post(sem2);

		if(strncmp("quit", msg_to_send.mtext, 4)==0)
		{
			printf("p1 readys to quit\n");
			break;
		}
	}
	
	struct msqid_ds msg_ds;
	struct msginfo msg_info;

	msgctl(msqid, IPC_STAT, &msg_ds);
	printf("==%lu,%lu,%lu==\n", msg_ds.__msg_cbytes,msg_ds.msg_qnum,msg_ds.msg_qbytes);
	
	int array_id = msgctl(msqid, MSG_INFO, (struct msqid_ds *)&msg_info);
	printf("==%d,%d,%d,%d==\n", msg_info.msgpool,msg_info.msgmap,msg_info.msgtql,array_id);
	
	msgctl(array_id, MSG_STAT, &msg_ds);
	printf("==%lu,%lu,%lu==\n", msg_ds.__msg_cbytes,msg_ds.msg_qnum,msg_ds.msg_qbytes);
	

	sleep(2);
	
	//釋放各種資源
	sem_close(sem1);
	sem_close(sem2);

	if(msgctl(msqid, IPC_RMID, NULL) < 0)
	{
		perror("remove msg queue fail");
	}

	return 0;
}

p2.c

#include <stdio.h>
#include <sys/types.h>
#include <string.h>

#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>

struct msgbuf 
{
	long mtype;       /* message type, must be > 0 */
	char mtext[100];    /* message data */
};


int main(void)
{
	int msqid;	//儲存訊息佇列的id(識別符號)
	sem_t *sem1 = NULL;	//儲存訊號量地址
	sem_t *sem2 = NULL;	//儲存訊號量地址
	ssize_t rcv_bytes;	//儲存從佇列讀取的位元組數
	int ret;

	//首先申請一個訊息佇列
	msqid = msgget((key_t)123, IPC_CREAT|0666);	
	if(msqid < 0)
	{
		perror("creat a msg queue fail");
		return -1;
	}

	//建立或開啟一個POSIX有名訊號量
	sem1 = sem_open("sem1", O_CREAT|O_RDWR, 0666, 1);
	if(sem1 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	//建立或開啟一個POSIX有名訊號量
	sem2 = sem_open("sem2", O_CREAT|O_RDWR, 0666, 0);
	if(sem2 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	struct msgbuf msg_to_rcv;
	
	while(1)
	{
		//等待訊號量有效,P操作
		sem_wait(sem2);
		
		ret = msgrcv(msqid, &msg_to_rcv, 100, 1, 0);
		if(ret < 0)
		{
			perror("receive msg from queue fail");
			return -3;
		}
		rcv_bytes = ret;
		printf("received a msg[%d bytes]:%s\n", (int)rcv_bytes, msg_to_rcv.mtext);
		
		sem_post(sem1);

		if(strncmp("quit", msg_to_rcv.mtext, 4)==0)
		{
			printf("p1 readys to quit\n");
			break;
		}
	}
	
	//釋放各種資源

	return 0;
}

執行結果

p1程序:

p2程序:

延伸

  • 訊息的型別是linux核心規定好的,不能更改;
  • 本示例中每次只發送一個訊息,實際上可以一次傳送多個訊息;
  • 而且我們可以使用msgctl函式設定或者獲取訊息佇列的狀態或資訊。

msgctl函式原型如下:

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

我們可以通過一些命令來獲取IPC的狀態、IPC的資訊等。
這些狀態或資訊會放到struct msqid_ds這個結構體裡面,

struct msqid_ds {
               struct ipc_perm msg_perm;     /* 擁有者和許可權 */
               time_t          msg_stime;    /* 最近一次傳送訊息的時間*/
               time_t          msg_rtime;    /* 最近一次接收訊息的時間 */
               time_t          msg_ctime;    /* 最近一次改動的時間,這裡的改動是指IPC SET*/
               unsigned long   __msg_cbytes; /* 隊列當前存在的位元組數 (nonstandard) */
               msgqnum_t       msg_qnum;     /* 隊列當前的訊息數量 */
               msglen_t        msg_qbytes;   /* 佇列允許的最大位元組數 */
               pid_t           msg_lspid;    /* 最近一次傳送訊息的程序ID */
               pid_t           msg_lrpid;    /* 最近一次接收訊息的程序ID */
           };

其中struct ipc_perm這個結構體有下列成員:

struct ipc_perm {
               key_t          __key;       /* Key supplied to msgget(2) */
               uid_t          uid;         /* Effective UID of owner */
               gid_t          gid;         /* Effective GID of owner */
               uid_t          cuid;        /* Effective UID of creator */
               gid_t          cgid;        /* Effective GID of creator */
               unsigned short mode;        /* Permissions */
               unsigned short __seq;       /* Sequence number */
           };

上面這個結構體我就不解釋了,大家看英文理解可能更好(更合適)。

我們可以使用下面這些命令來操作訊息佇列,這裡簡要介紹一下:

IPC_STAT:這個命令可以獲取指定的IPC的狀態,所有狀態資訊通過上述的msqid_ds 結構返回。

IPC_SET:這個命令可以設定IPC的一些屬性,比如:

msg_qbytes,
msg_perm.uid, 
msg_perm.gid,  
msg_perm.mode.

IPC_RMID:這個命令可以刪除訊息佇列。

IPC_INFO:這個命令可以獲取系統對訊息佇列的限制和引數,這些資訊通過如下結構體返回(要注意強制型別轉換為第三個引數的型別):

struct msginfo {
                      int msgpool; /* Size in kibibytes of buffer pool
                                      used to hold message data;
                                      unused within kernel */
                      int msgmap;  /* Maximum number of entries in message
                                      map; unused within kernel */
                      int msgmax;  /* Maximum number of bytes that can be
                                      written in a single message */
                      int msgmnb;  /* Maximum number of bytes that can be
                                      written to queue; used to initialize
                                      msg_qbytes during queue creation
                                      (msgget(2)) */
                      int msgmni;  /* Maximum number of message queues */
                      int msgssz;  /* Message segment size;
                                      unused within kernel */
                      int msgtql;  /* Maximum number of messages on all queues
                                      in system; unused within kernel */
                      unsigned short int msgseg;
                                   /* Maximum number of segments;
                                      unused within kernel */
                  };

MSG_INFO:這個命令可以獲取訊息佇列消耗的系統資源資訊,通過msginfo 結構返回(要注意強制型別轉換為第三個引數的型別):

msgpool - 系統當前存在的訊息佇列的數量
msgmap - 系統當前所有訊息佇列的訊息總數
msgtql - 系統所有訊息佇列所有訊息的總位元組數

MSG_STAT:這個命令可以獲取關於所有訊息佇列的資訊,資訊通過msqid_ds結構體返回。

總結

本文詳細介紹了訊息佇列的作用,如何通過他來實現程序間的通訊;以及介紹了關於訊息佇列的一些操作。