1. 程式人生 > >Linux POSIX Message Queue 使用體會

Linux POSIX Message Queue 使用體會

最近正在Linux上使用POSIX Message Queue(以下簡稱MQ)在程序間通訊,對目前我這系統發行版和編譯器來講,MQ用起來有一點體會,是教程是沒有說明的,或者我看的不夠仔細,沒有發現疑問

參考資料

《The Linux Programming Interface - A Linux and UNIX System Programming Handbook》2010

測試系統:

Linux Mint 3.11.0-12-generic x86_64 GNU/Linux, MPICH2, mpic++ (Ubuntu/Linaro 4.8.1-10ubuntu9) 4.8.1

先上程式碼


// UNIX IPC functionalities
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
// POSIX Message Queue
#include <mqueue.h>

// C++ header files
#include <iostream>
#include <sstream>
#include <string>


int receiveMsg(mqd_t *mqdes)
{
	unsigned int prio;   // priority of the message
	struct mq_attr attr; // attribute
	ssize_t numRead;     // size of message

	if (mqdes == NULL)
	{
		std::cout << "mqdes == NULL" << std::endl;
		exit(EXIT_FAILURE);
	}

	do
	{
		sleep(1);

		std::cout << "Check for new message." << std::endl;

		if (mq_getattr(*mqdes, &attr) == -1)
		{
			// this is an error
			std::cout << "In receiveMsg(), get attribute error." << std::endl;
			exit(EXIT_FAILURE);
		}
	}while (attr.mq_curmsgs == 0);

	// allocate memory for the incoming messages
	char * buffer = (char *)malloc(attr.mq_msgsize);
	if (buffer == NULL)
	{
		std::cout << "In receiveMsg(), malloc error." << std::endl;
		exit(EXIT_FAILURE);
	}
	memset(buffer,0,attr.mq_msgsize);
	// test use
//	memset(buffer,1,attr.mq_msgsize);

	for (int i=0;i<attr.mq_curmsgs;++i)
	{
		numRead = mq_receive(*mqdes, buffer, attr.mq_msgsize, &prio);
		if (numRead == -1)
		{
			std::cout << "In receiveMsg(), mq_receive() error." << std::endl;
			exit(EXIT_FAILURE);
		}

		// print the message to the standard output
		std::cout << "In receiveMsg(), buffer = "
				  << buffer
				  << ", with strlen(buffer) = "
				  << strlen(buffer)
				  << std::endl;

		std::cout << "buffer = ";

		for (int j=0;j<strlen(buffer);++j)
		{
			std::cout << (unsigned int)(buffer[j]) << " ";
		}

		std::cout << std::endl;
	}

	free(buffer);

	return 0;
}

int main(int argc, char *argv[])
{
	// create the POSIX message queue
	mqd_t mqdes;         // descriptor

	mqdes = mq_open("/mymq", O_CREAT | O_RDWR | O_NONBLOCK, 0666, NULL);

	if (mqdes == (mqd_t) -1)
	{
		std::cout << "Open MQ failed!" << std::endl;
		exit(EXIT_FAILURE);
	}

	// begin to receive message
	if (receiveMsg(&mqdes) != 0)
	{
		std::cout << "receiveMsg() returns error code." << std::endl;
	}

	mq_close(mqdes);
	mq_unlink("/mymq");

	exit(EXIT_SUCCESS);
}

話不多說啦,體會:

=========== 體會開始 ==============

Message其實是byte計數的記憶體資料,不是字串!

=========== 體會結束 ==============

Message這名字起的挺廣泛,而且看手冊,舉的例子也是傳輸的字串。於是在自己實現時,就想當然的認為MQ其實時傳輸的是字串,但這是不對的。

今天實際除錯時就出現了這個問題,具體表現形式是mq_receive()函式呼叫以後,buffer里正常字串結尾有好多奇怪的字元。剛開始還感覺是不是字元編碼的問題,在OS上查了一圈,感覺又不像,最後發現問題出在沒有對buffer進行初始化。

使用沒有初始化的記憶體是禁忌的,但是當時除錯時偷懶了,因為下意識地認為,傳送的Message是字串,言下之意是一定有一個結尾'\0'字元自動新增到buffer裡。其實不是這樣的,在《手冊》的例項中,也明確表達了,傳送的Message的長度是一個字串的strlen()結果,就是說,不包括最後一個'\0'。雖然傳送時以一個字串的頭地址作為待發送內容的起始,但是最終傳送的長度僅是strlen(),比實際的字串佔用記憶體少了一個byte。在mq_receive()時,MQ也不會自動新增最後一個'\0',因為沒這個必要 ---- Message壓根就不是字串。

所以回顧mq_receive()函式的宣告,恍然大悟

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
返回值就是指的當前接收的message的byte數,是個非常重要的值,因為Message不是字串。

現在理解了,MQ的Message事實上是一段用byte計數的記憶體,MQ將這塊記憶體以訊息的形式傳送給kernel,其他程序再從kernel中獲取(複製)這塊記憶體中的資料。

事實上,要是就想傳送字串,有兩種方法,要麼傳送時多發一個位元組,要麼接收之前將buffer初始化,並且一定要初始化為0,否則就會出現我今天早些時候遇到的問題,尾巴上一大堆詭異的字元(可以通過receiveMsg()中的memset()除錯這個bug)。

P.S.: 傳送Message用的是python2.7程式碼,從這裡獲取,安裝之前需要安裝python2.7-dev。

https://pypi.python.org/pypi/posix_ipc