嵌入式Linux併發程式設計,程序間通訊方式,System V IPC,訊息佇列,開啟/建立msgget(), 傳送訊息msgsnd(),格式,接收訊息msgrcv(),控制訊息佇列 msgctl()
阿新 • • 發佈:2018-12-14
文章目錄
- 1,訊息佇列
- 2,訊息佇列結構
- 3,訊息佇列使用步驟
- 4,訊息佇列---示例
1,訊息佇列
- 訊息佇列是System V IPC物件的一種
- 訊息佇列由訊息佇列ID來唯一標識
- 訊息佇列就是一個訊息的列表。使用者可以在訊息佇列中新增訊息、讀取訊息等
- 訊息佇列可以按照型別來發送/接收訊息(不同的程序可以通過一個訊息佇列,實現向某一個程序傳送訊息;可以在訊息佇列中為每個程序定義不同的訊息型別,當前程序只需接受自己的型別的訊息,傳送對方的型別的訊息)
2,訊息佇列結構
3,訊息佇列使用步驟
- 開啟/建立訊息佇列 msgget
- 向訊息佇列傳送訊息 msgsnd
- 從訊息佇列接收訊息 msgrcv
- 控制訊息佇列 msgctl
3.1,開啟/建立訊息佇列 msgget()
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
- 成功時返回訊息佇列的id,失敗時返回EOF
- key 和訊息佇列關聯的key IPC_PRIVATE 或 ftok
- msgflg 標誌位 IPC_CREAT|0666
3.1.1,開啟/建立訊息佇列—示例msgget()
……
int main()
{
int msgid;
key_t key;
if ((key = ftok(“.”, ‘q’)) == -1) {
perror(“ftok”); exit(-1);
}
if ((msgid = msgget(key, IPC_CREAT|0666)) < 0) {
perror(“msgget”); exit(-1);
}
……
return 0;
}
3.2,向訊息佇列傳送訊息 msgsnd()
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msgid, const void *msgp, size_t size,int msgflg);
- 成功時返回0,失敗時返回-1
- msgid 訊息佇列id
- msgp 訊息緩衝區地址(存放的是要傳送的訊息)
- size 訊息正文長度
- msgflg 標誌位 0(阻塞方式,表示訊息傳送成功了才返回) 或 IPC_NOWAIT(不需要等待訊息完全傳送成功就可以返回,如:訊息佇列滿了)
3.2.1,訊息格式
- 通訊雙方首先定義好統一的訊息格式
- 使用者根據應用需求定義結構體型別
- 首成員型別為long,代表訊息型別(正整數)
- 其他成員都屬於訊息正文
3.2.2,訊息傳送—示例
typedef struct
{
long mtype;
char mtext[64];
} MSG;
#define LEN (sizeof(MSG) – sizeof(long))
int main()
{
MSG buf;
……
buf.mtype = 100;
fgets(buf.mtext, 64, stdin);
msgsnd(msgid, &buf,LEN, 0);
……
return 0;
}
3.3,從訊息佇列接收訊息 msgrcv()
#include <sys/ipc.h>
#include <sys/msg.h>
int msgrcv(int msgid, void *msgp, size_t size, long msgtype,int msgflg);
- 成功時返回收到的訊息長度,失敗時返回-1
- msgid 訊息佇列id
- msgp 訊息緩衝區地址
- size 指定接收的訊息長度 (超過指定長度的部分會丟失)
- msgtype 指定接收的訊息型別,指定成0表示接收訊息佇列中最早的一個訊息 ,指定成負數表示按優先順序接受
- msgflg 標誌位 0(0表示接收不成功則阻塞,直到有訊息了,或訊息佇列被刪除了出錯返回,或被訊號打斷了) 或 IPC_NOWAIT
3.3.1,訊息接收—示例
typedef struct {
long mtype;
char mtext[64];
} MSG;
#define LEN (sizeof(MSG) – sizeof(long))
int main() {
MSG buf;
……
if (msgrcv(msgid, &buf,LEN, 200, 0) < 0) {
perror(“msgrcv”);
exit(-1);
}
……
}
3.4,控制訊息佇列 msgctl()
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msgid, int cmd, struct msqid_ds *buf);
- 成功時返回0,失敗時返回-1
- msgid 訊息佇列id
- cmd 要執行的操作 IPC_STAT(獲取屬性) / IPC_SET (設定屬性)/ IPC_RMID(刪除訊息佇列,第三個引數傳NULL)
- buf 存放訊息佇列屬性的地址
訊息佇列的刪除和共享記憶體不一樣。共享記憶體系統會檢查,所有程序都取消映射了才會真的刪除,而訊息只要程序一執行IPC_RMID,訊息佇列會立刻被刪除
4,訊息佇列—示例
要求:兩個程序通過訊息佇列輪流將鍵盤輸入的字串傳送給對方,接收並列印對方傳送的訊息
/******clientA.c******/
#include <stdio.h>
#include <stdlib.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget(key,IPC_CREAT|0666)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
buf.mtype = TypeB;
printf("input >");
fgets(buf.mtext,64,stdin);
msgsnd(msgid,&buf,LEN,0);
if(msgrcv(msgid,&buf,LEN,TypeA,0) < 0)
{
perror("msgrcv");
exit(-1);
}
printf("recv from clientB: %s",buf.mtext);
}
return 0;
}
/******clientB.c******/
#include <stdio.h>
#include <stdlib.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget(key,IPC_CREAT|0666)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
if(msgrcv(msgid,&buf,LEN,TypeB,0) < 0)
{
perror("msgrcv");
exit(-1);
}
printf("recv from clientA: %s",buf.mtext);
buf.mtype = TypeA;
printf("input >");
fgets(buf.mtext,64,stdin);
msgsnd(msgid,&buf,LEN,0);
}
return 0;
}
步驟 | 終端一 | 終端二 |
---|---|---|
·在終端一執行clinntA ·在終端二中可用命令ipcs -q看到系統訊息佇列中多了一訊息佇列 |
[email protected]:~/test/interprocess/message_queue$ ./clientA.out |
[email protected]:~/test/interprocess/message_queue$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x7107f38d 32768 linux 666 0 0 |
·在終端二執行clinntB ·在兩個中斷中可以交替輸入,列印資訊 |
[email protected]:~/test/interprocess/message_queue$ ./clientA.out input >ABC input >recv from clientB: ZXCV |
[email protected]:~/test/interprocess/message_queue$ ./clientB.out recv from clientA: ABC input >ZXCV |
·在終端二關閉clinntB(Ctrl-C) ·在終端一中繼續輸入 ·在終端二中可用命令ipcs -q看到系統訊息佇列仍然存在 |
input >qwe |
^C [email protected]:~/test/interprocess/message_queue$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x7107f38d 32768 linux 666 64 1 |
·在終端二輸入ipcrm -q 32768手動刪除訊息佇列 ·可以在終端一中看到clinntA自動結束執行 |
[email protected]:~/test/interprocess/message_queue$ ipcrm -q 32768 |
改進,增添退出處理,程式結束,自動刪除訊息佇列
/******clientA.c******/
#include <stdio.h>
#include <stdlib.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget(key,IPC_CREAT|0666)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
buf.mtype = TypeB;
printf("input >");
fgets(buf.mtext,64,stdin);
msgsnd(msgid,&buf,LEN,0);
if(strcmp(buf.mtext,"quite\n") == 0)break;
if(msgrcv(msgid,&buf,LEN,TypeA,0) < 0)
{
perror("msgrcv");
exit(-1);
}
if(strcmp(buf.mtext,"quite\n") == 0)
{
msgctl(msgid,IPC_RMID,NULL);
exit(0);
}
printf("recv from clientB: %s",buf.mtext);
}
return 0;
}
/******clientB.c******/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget(key,IPC_CREAT|0666)) < 0)
{
perror("msgget");
exit(-1);
}
while(1)
{
if(msgrcv(msgid,&buf,LEN,TypeB,0) < 0)
{
perror("msgrcv");
exit(-1);
}
if(strcmp(buf.mtext,"quite\n") == 0)
{
msgctl(msgid,IPC_RMID,NULL);
exit(0);
}
printf("recv from clientA: %s",buf.mtext);
buf.mtype = TypeA;
printf("input >");
fgets(buf.mtext,64,stdin);
msgsnd(msgid,&buf,LEN,0);
if(strcmp(buf.mtext,"quite\n") == 0)break;
}
return 0;
}
改進思路:
·通過多程序/執行緒實現同時收發訊息
·引入伺服器端,實現不同通訊方式
/******clientA.c******/
#include <stdio.h>
#include <stdlib.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
pid_t pid;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget(key,IPC_CREAT|0666)) < 0)
{
perror("msgget");
exit(-1);
}
if((pid = fork()) < 0)
{
perror("fork");
exit(-1);
}
else if (pid == 0)
{
while(1)
{
buf.mtype = TypeB;
printf("input >");
fgets(buf.mtext,64,stdin);
msgsnd(msgid,&buf,LEN,0);
if(strcmp(buf.mtext,"quite\n") == 0)break;
}
}
else
{
while(1)
{
if(msgrcv(msgid,&buf,LEN,TypeA,0) < 0)
{
perror("msgrcv");
exit(-1);
}
if(strcmp(buf.mtext,"quite\n") == 0)
{
msgctl(msgid,IPC_RMID,NULL);
exit(0);
}
printf("recv from clientB: %s",buf.mtext);
}
kill(pid,SIGUSR1);
}
return 0;
}
/******clientB.c******/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
typedef struct{
long mtype;
char mtext[64];
}MSG;
#define LEN (sizeof(MSG) - sizeof(long))
#define TypeA 100
#define TypeB 200
int main(int argc, const char *argv[])
{
key_t key;
int msgid;
MSG buf;
pid_t pid;
if((key = ftok(".",'q')) < 0)
{
perror("ftok");
exit(-1);
}
if((msgid = msgget