1. 程式人生 > >IPC之Posix訊息佇列詳解

IPC之Posix訊息佇列詳解

基本概念:

    訊息佇列可認為是一個訊息連結串列。有足夠寫許可權的執行緒可往佇列中放置訊息,有足夠讀許可權的執行緒可從佇列中取走訊息,每個訊息都是一個記錄(非位元組流式,也就是不需要自定義邊界),它由傳送者賦予一個優先順序。在某個程序往一個佇列寫入訊息之前,並不需要另外某個程序在該佇列上等待訊息的到達。

    一個程序可以往某個佇列寫入一些訊息,然後終止,再讓另外一個程序在以後某個時刻讀出這些訊息。訊息佇列具有隨核心的持續性,也就是程序關閉後,訊息佇列依然存在。

訊息佇列的標準:

    訊息佇列分為兩大標準,Posix訊息佇列和System V訊息佇列,此文只講Posix訊息佇列。

此文的環境:

例子的執行系統:Ubuntu 14

執行前需要執行的命令(為什麼需要執行下列命令,請查閱man 7 mq_overview):

Mounting the message queue filesystem
       On Linux, message queues are created in a virtual  filesystem.   (Other
       implementations  may  also  provide such a feature, but the details are
       likely to differ.)  This filesystem can be mounted (by  the  superuser)
       using the following commands:


           # mkdir /dev/mqueue
           # mount -t mqueue none /dev/mqueue


       The sticky bit is automatically enabled on the mount directory.

建立訊息佇列:

MQ_OPEN(3)                 Linux Programmer's Manual                MQ_OPEN(3)

NAME
       mq_open - open a message queue

SYNOPSIS
       #include <fcntl.h>           /* For O_* constants */
       #include <sys/stat.h>        /* For mode constants */
       #include <mqueue.h>

       mqd_t mq_open(const char *name, int oflag);
       mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);

       Link with -lrt.
返回:若成功則為訊息佇列描述符,若出錯則為-1

關閉訊息佇列:

MQ_CLOSE(3)                Linux Programmer's Manual               MQ_CLOSE(3)

NAME
       mq_close - close a message queue descriptor

SYNOPSIS
       #include <mqueue.h>

       int mq_close(mqd_t mqdes);

       Link with -lrt.
返回:若成功則為0,若出錯則為-1

刪除訊息佇列:

MQ_UNLINK(3)               Linux Programmer's Manual              MQ_UNLINK(3)

NAME
       mq_unlink - remove a message queue

SYNOPSIS
       #include <mqueue.h>

       int mq_unlink(const char *name);

       Link with -lrt.
返回:若成功則為0,若出錯則為-1

獲取/設定訊息佇列屬性:

MQ_GETATTR(3)              Linux Programmer's Manual             MQ_GETATTR(3)

NAME
       mq_getattr, mq_setattr - get/set message queue attributes

SYNOPSIS
       #include <mqueue.h>

       int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

       int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
                        struct mq_attr *oldattr);

       Link with -lrt.
均返回:若成功則為0,若出錯則為-1

mq_attr結構含有以下屬性

struct mq_attr {
	long mq_flags;       /* Flags: 0 or O_NONBLOCK */
	long mq_maxmsg;      /* Max. # of messages on queue */
	long mq_msgsize;     /* Max. message size (bytes) */
	long mq_curmsgs;     /* # of messages currently in queue */
};

a)mq_setattr函式只允許設定mq_attr結構的mq_flags成員,其它三個成員被忽略。

b)指向某個mq_attr結構的指標可作為mq_open的第四個引數傳遞,每個佇列的最大訊息數和每個訊息的最大位元組數只能在建立佇列時設定,而且這兩者必須同時指定。

c)訊息佇列中的當前訊息數則只能獲取不能設定。

例子1,建立並設定訊息佇列的屬性,再刪除訊息佇列。

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

int main()
{
    struct mq_attr attr;
    attr.mq_maxmsg = 24; /* 設定訊息佇列的最大訊息個數 */
    attr.mq_msgsize = 8192; /* 設定每個訊息的最大位元組數 */

    char *name = "/temp.mq"; /* linux必須是/filename這種格式,不能出現二級目錄 */
    mqd_t mqd = mq_open(name, O_RDWR | O_CREAT | O_EXCL, 0777, &attr);
    if (mqd == -1) {
        perror("create failed");
        mqd = mq_open(name, O_RDWR);
        if (mqd == -1) {
            perror("open failed");
            exit(EXIT_FAILURE);
        }
    }
    printf("mq_open %s success\n", name);

    /* 開啟成功,獲取當前屬性 */
    mq_getattr(mqd, &attr);
    printf("max msg = %ld, max bytes = %ld, currently = %ld\n",
            attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    /* 關閉 */
    if (mq_close(mqd) != -1)
    	printf("mq_close %s success\n", name);

    /* 刪除 */
    sleep(30); /* 觀察建立情況 */
    if (mq_unlink(name) != -1)
        printf("mq_unlink %s success\n", name);

    return 0;
}

編譯執行:


傳送訊息:

MQ_SEND(3)                 Linux Programmer's Manual                MQ_SEND(3)

NAME
       mq_send, mq_timedsend - send a message to a message queue

SYNOPSIS
       #include <mqueue.h>

       int mq_send(mqd_t mqdes, const char *msg_ptr,
                     size_t msg_len, unsigned msg_prio);

       #include <time.h>
       #include <mqueue.h>

       int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
                     size_t msg_len, unsigned msg_prio,
                     const struct timespec *abs_timeout);

       Link with -lrt.
返回:若成功則為0,若出錯則為-1

mq_send的prio引數是待發送訊息的優先順序,其值必須小於MQ_PRIO_MAX。如果應用不必使用優先順序不同的訊息,那就給mq_send指定值為0的優先順序,給mq_reveive指定一個空指標作為其最後一個引數。

接收訊息:

MQ_RECEIVE(3)              Linux Programmer's Manual             MQ_RECEIVE(3)

NAME
       mq_receive, mq_timedreceive - receive a message from a message queue

SYNOPSIS
       #include <mqueue.h>

       ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
                          size_t msg_len, unsigned *msg_prio);

       #include <time.h>
       #include <mqueue.h>

       ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
                          size_t msg_len, unsigned *msg_prio,
                          const struct timespec *abs_timeout);

       Link with -lrt.
返回:若成功則為訊息中接字數,若出錯則為-1


mq_receive總是返回所指定佇列中最高優先順序的最早訊息,而且該優先順序能隨該訊息的內容及其長度一同返回。

mq_receive的len引數的值不能小於能加到所指定隊裡中的訊息的最大大小(該佇列mq_attr結構的mq_msgsize成員)。要是len小於該值,mq_reveive就立即返回EMSGSIZE錯誤。這意味著使用Posix訊息佇列的大多數應用程式必須在開啟某個佇列後呼叫mq_getattr確定最大訊息大小,然後分配一個或多個那樣大小的讀緩衝區。

例子2,傳送3個優先順序不同的訊息並接收。

傳送程式mqsend.c:

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

int main()
{
    struct mq_attr attr;
    attr.mq_maxmsg = 24; /* 設定訊息佇列的最大訊息個數 */
    attr.mq_msgsize = 8192; /* 設定每個訊息的最大位元組數 */

    char *name = "/temp.mq"; /* linux必須是/filename這種格式,不能出現二級目錄 */
    mqd_t mqd = mq_open(name, O_RDWR | O_CREAT | O_EXCL, 0777, &attr);
    if (mqd == -1) {
        perror("create failed");
        mqd = mq_open(name, O_RDWR);
        if (mqd == -1) {
            perror("open failed");
            exit(EXIT_FAILURE);
        }
    }
    printf("mq_open %s success\n", name);

    /* 開啟成功,獲取當前屬性 */
    mq_getattr(mqd, &attr);
    printf("max msg = %ld, max bytes = %ld, currently = %ld\n",
            attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    char *msg_ptr1 = "hello world1";
    char *msg_ptr2 = "hello world2";
    char *msg_ptr3 = "hello world3";
    size_t msg_len1 = strlen(msg_ptr1);
    size_t msg_len2 = strlen(msg_ptr2);
    size_t msg_len3 = strlen(msg_ptr3);

    /* 先發送優先順序低的 */
    mq_send(mqd, msg_ptr1, msg_len1, 1);
    mq_send(mqd, msg_ptr2, msg_len2, 2);
    mq_send(mqd, msg_ptr3, msg_len3, 3);

    printf("mq_send success\n");
    if (mq_close(mqd) != -1)
        printf("mq_close %s success\n", name);

    return 0;
}

接收程式mqreceive.c:

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

int main()
{
    char *name = "/temp.mq"; /* linux必須是/filename這種格式,不能出現二級目錄 */
    mqd_t mqd = mq_open(name, O_RDWR);
    if (mqd == -1) {
        perror("open failed");
        exit(EXIT_FAILURE);
    }
    printf("mq_open %s success\n", name);

    struct mq_attr attr;
    if (mq_getattr(mqd, &attr) == -1) {
        perror("mq_getattr failed");
        exit(EXIT_FAILURE);
    }

    char *msg_ptr;
    size_t msg_len = attr.mq_msgsize;
    unsigned msg_prio;
    msg_ptr = (char *)malloc(msg_len);

    while (1) {
        bzero(msg_ptr, msg_len);
        int res = mq_receive(mqd, msg_ptr, msg_len, &msg_prio);
        if (res != -1) {
            printf("msg is:%s, msg_prio:%d\n", msg_ptr, msg_prio);
        } else {
            perror("mq_receive failed");
            break;
        }
    }

    if (mq_close(mqd) != -1)
        printf("mq_close %s success\n", name);

    return 0;
}

編譯執行:


訊息佇列限制:

我們已經遇到任意給定佇列的兩個限制,它們都是在建立該佇列時建立的(測試中發現mq_getattr與ulimit -q的結果並不一致):

mq_maxmsg ->佇列中的最大訊息數

mq_msgsize ->給定訊息的最大位元組數

訊息佇列的實現定義了另外兩個限制(呼叫sysconf函式獲取):

MQ_OPEN_MAX -> 一個程序能夠同時擁有的開啟著訊息佇列的最大數目

MQ_PRIO_MAX -> 任意訊息的最大優先順序加1

測試程式碼與執行結果:

#include <unistd.h>
#include <stdio.h>

// man 7 mq_overview

int main()
{
    long mq_open_max = sysconf(_SC_MQ_OPEN_MAX);
    long mq_prio_max = sysconf(_SC_MQ_PRIO_MAX);
    printf("mq_open_max:%ld\n", mq_open_max);
    printf("mq_prio_max:%ld\n", mq_prio_max);
}



高階:

Posix訊息佇列允許非同步事件通知,以告知合適有一個訊息放置到某個空訊息佇列中。這種通知有兩種方式可供選擇:

a)產生一個訊號

b)建立一個執行緒來執行一個指定的函式。

這種通知通過呼叫mq_notify建立。

MQ_NOTIFY(3)               Linux Programmer's Manual              MQ_NOTIFY(3)

NAME
       mq_notify - register for notification when a message is available

SYNOPSIS
       #include <mqueue.h>

       int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

       Link with -lrt.

這裡不進行深入展開。

參考:《unix網路程式設計》·卷2

End;

相關推薦

IPCPosix訊息佇列

基本概念:     訊息佇列可認為是一個訊息連結串列。有足夠寫許可權的執行緒可往佇列中放置訊息,有足夠讀許可權的執行緒可從佇列中取走訊息,每個訊息都是一個記錄(非位元組流式,也就是不需要自定義邊界),它由傳送者賦予一個優先順序。在某個程序往一個佇列寫入訊息之前,並不需要另

IPCPosix共享記憶體

1.概念 共享記憶體區,按標準可分為Posix共享記憶體區和System V共享記憶體區,兩者在概念上類似。 Posix 表示可移植作業系統介面(Portable Operating System Interface ,縮寫為 POSIX ),POSIX標準定義了作業系統

linux IPCPOSIX訊息佇列

1.POSIX概述         前面已經學習了LINUX程序通訊方式的程序和FIFO兩種方式,但是POSIX是與之前兩種不同的方式,主要區別呢,在於使用管道和FIFO的時候,在寫入管道之前,應該有一個程序已經做好了讀的準備,它呢是以無格式的位元組流的方式進行通訊的,如果

IPC通訊------------訊息佇列

訊息佇列(也叫做報文佇列)能夠克服早期unix通訊機制的一些缺點。作為早期unix通訊機制之一的訊號能夠傳送的資訊量有限,後來雖然POSIX 1003.1b在訊號的實時性方面作了拓廣,使得訊號在傳遞資訊量方面有了相當程度的改進,但是訊號這種通訊方式更像"即時"的通訊方式,它要

【Linux】程序間通訊(IPC訊息佇列及測試用例

學習環境 Centos6.5 Linux 核心 2.6 什麼是訊息佇列? 訊息佇列是SystemV版本中三種程序通訊機制之一,另外兩種是訊號量和共享儲存段。訊息佇列提供了程序間傳送資料塊的方法,而且每個資料塊都有一個型別標識。訊息佇列是基於訊息的,而管

IPC訊息佇列與使用

一、    概念  訊息佇列就是一個訊息的連結串列。對訊息佇列有寫許可權的程序可以向其中按照一定的規則新增新訊息;對訊息佇列有讀許可權的程序可以從訊息佇列中讀出訊息。訊息佇列是隨核心持續的。下面介紹三個概念: 1;隨程序持續:IPC一直存在,直至開啟IPC物件的最後一個程序

訊息佇列

轉載:https://blog.csdn.net/qq_36236890/article/details/81174504 說明:此文是筆者對中華石衫老師對訊息佇列講解的一篇總結包括筆者自己的一些理解 一、為什麼使用訊息佇列? 訊息佇列使用的場景和中介軟體有很多,但解決的核心問題

Linux程序間通訊POSIX訊息佇列

訊息佇列可認為是一個訊息連結串列,它允許程序之間以訊息的形式交換資料。有足夠寫許可權的程序或執行緒可往佇列中放置訊息,有足夠讀許可權的程序或執行緒可從佇列中取走訊息。每個訊息都是一個記錄,它由傳送者賦予一個優先順序。與管道不同,管道是位元組流模型,沒有訊息邊界。

linux網路程式設計POSIX訊息佇列

POSIX IPC名字限定: 必須以/打頭,並且後續不能有其它/ ,形如/somename 長度不能超過NAME_MAX 通過下面的命令將訊息佇列掛載到/dev/mqueue下,可通過cat/dev/mqueue/name檢視訊息佇列狀態 mount -t mqueue

Linux系統下-程序間通訊(訊息佇列-

Linux下程序間通訊方式: # 管道( pipe ):管道是一種半雙工的通訊方式,資料只能單向流動,而且只能在具有親緣關係的程序間使用。程序的親緣關係通常是指父子程序關係。 # 有名管道 (named pipe) : 有名管道也是半雙工的通訊方式,但是它允許無親緣關係程序

Java併發程式設計LinkedBlockingDeque阻塞佇列

簡介LinkedBlockingDeque是一個由連結串列結構組成的雙向阻塞佇列,即可以從佇列的兩端插入和移除元素。雙向佇列因為多了一個操作佇列的入口,在多執行緒同時入隊時,也就減少了一半的競爭。相比於其他阻塞佇列,LinkedBlockingDeque多了addFirst、

windows訊息訊息佇列

與基於MS - DOS的應用程式不同,Windows的應用程式是事件(訊息)驅動的。它們不會顯式地呼叫函式(如C執行時庫呼叫)來獲取輸入,而是等待windows向它們傳遞輸入。 windows系統把應用程式的輸入事件傳遞給各個視窗,每個視窗有一個函式,稱為視窗訊息處理函式。視窗訊息處理函式處理各種使用者輸

Linux訊息佇列

Linux的訊息佇列(queue)實質上是一個連結串列, 它有訊息佇列識別符號(queue ID). msgget建立一個新佇列或開啟一個存在的佇列; msgsnd向佇列末端新增一條新訊息; msgrcv從佇列中取訊息, 取訊息是不一定遵循先進先出的, 也可以按訊息的型別欄位取訊息.  1. 識別符號(des

分散式訊息佇列

原文:http://www.cnblogs.com/itfly8/p/5155983.html 大型網站架構之分散式訊息佇列 以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例(電商,日誌系統)。 本次分享大綱 訊息佇列概述訊息佇列應用場景訊息中介軟體示例JMS訊息服

linux網路程式設計POSIX 訊息佇列 和 系列函式

#include<stdio.h>#include<stdlib.h>#include<sys/ipc.h>#include<sys/msg.h>#include<sys/types.h>#include<unistd.h>#includ

中斷底半部機制工作佇列

工作佇列的使用方法和tasklet 非常相似,下面的程式碼用於定義一個工作佇列和一個底半部執行函式。 struct work_struct my_wq; /*定義一個工作佇列*/ void my_wq_func(unsigned long); /*定義一個處理函式*/ 通過INIT_W

Java佇列 LinkedList 類

Java佇列詳解之 LinkedList 類 1. 類簡介 類釋義 A collection designed for holding elements prior to processing. Besides basic Collection oper

Java 進階——多執行緒優化執行緒池 ThreadPoolExecutor的核心容器阻塞佇列(一)

#引言 多執行緒我想無論是後端開發,還是對於App開發者來說都不會陌生,何況Android強制要求不能在主執行緒中做網路請求,於是乎,在很多初學者或者App的原始碼中會出現會多的new Thread…的方式,這樣的程式碼是不優雅而且存在很多的隱患,假如說在使用者

Linux程序間通訊(IPC)程式設計實踐(十二)Posix訊息佇列--基本API的使用

posix訊息佇列與system v訊息佇列的差別: (1)對posix訊息佇列的讀總是返回最高優先順序的最早訊息,對system v訊息佇列的讀則可以返回任意指定優先順序的訊息。 (2)當往一個空佇列放置一個訊息時,posix訊息佇列允許產生一個訊號或啟動一個執行緒,

資料結構圖文解析佇列與C++模板實現

正文 回到頂部 0. 資料結構圖文解析系列 回到頂部 1. 佇列簡介 回到頂部 1.1 佇列的特點 佇列(Queue)與棧一樣,是一種線性儲存結構,它具有如下特點: 佇列中的資料元素遵循“先進先出”(First In First Out)的原則,簡稱FI