基於訊息佇列的多程序伺服器
阿新 • • 發佈:2019-02-20
目錄
一、思路
1)server程序接收時, 指定msgtyp為0, 從隊首不斷接收訊息;
2)server程序傳送時, 將mtype指定為接收到的client程序的pid;
3)client程序傳送的時候, mtype指定為自己程序的pid;
4)client程序接收時, 需要將msgtyp指定為自己程序的pid, 只接收訊息型別為自己pid的訊息。
問題:
但上述程式是存在死鎖的風險的,當同時開了多個客戶端,將佇列寫滿了,此時伺服器端想要寫入就會阻塞,而因為客戶端一旦傳送了資料就阻塞等待伺服器端回射型別為pid的訊息,即佇列的訊息不會減少,此時就會形成死鎖,互相等待。非阻塞方式傳送也不行,因為佇列已滿,會發生EAGAIN錯誤。
改進:
某個客戶端先建立一個私有訊息佇列,然後將私有訊息佇列識別符號和具體資料通過共享的佇列傳送給Server,伺服器fork 出一個子程序,此時根據私有佇列識別符號就可以將資料回射到這個佇列,這個客戶端就可以從私有佇列讀取到回射的資料。
二、實現
1. 原始程式碼
server.c
/* Server */
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
#define MSGMAX 8192
struct msgbuf
{
long mtype;
char mtext[MSGMAX];
};
void echo_ser(int msgid)
{
struct msgbuf msg;
memset(&msg, 0, sizeof (msg));
int nrcv ;
while (1)
{
if ((nrcv = msgrcv(msgid, &msg, sizeof(msg.mtext), 1, 0)) < 0); //指定接受優先順序為1的(msgtyp)
int pid = *((int *)msg.mtext);
fputs(msg.mtext + 4, stdout);
msg.mtype = pid;
msgsnd(msgid, &msg, nrcv, 0);
memset(&msg, 0, sizeof(msg));
}
}
int main(int argc, char *argv[])
{
int msgid;
msgid = msgget(1234, IPC_CREAT | 0666); //建立一個訊息佇列
if (msgid == -1)
ERR_EXIT("msgget");
echo_ser(msgid);
return 0;
}
//---------------------
//作者:NK_test
//來源:CSDN
//原文:https://blog.csdn.net/NK_test/article/details/49866113
client.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
#define MSGMAX 8192
struct msgbuf
{
long mtype;
char mtext[MSGMAX];
};
void echo_cli(int msgid)
{
int nrcv;
int pid = getpid();
struct msgbuf msg;
memset(&msg, 0, sizeof(msg));
msg.mtype = 1;
*((int *)msg.mtext) = pid;
while (fgets(msg.mtext + 4, MSGMAX, stdin) != NULL) //客戶端輸入資訊
{
if (msgsnd(msgid, &msg, MSGMAX, IPC_NOWAIT) < 0)
ERR_EXIT("msgsnd");
memset(msg.mtext + 4, 0, MSGMAX - 4);
if ((nrcv = msgrcv(msgid, &msg, MSGMAX, pid, 0)) < 0)
ERR_EXIT("msgsnd");
fputs(msg.mtext + 4, stdout);
memset(msg.mtext + 4, 0, MSGMAX - 4);
}
}
int main(int argc, char *argv[])
{
int msgid;
msgid = msgget(1234, 0); //開啟名為1234的訊息佇列
if (msgid == -1)
ERR_EXIT("msgget");
echo_cli(msgid);
return 0;
}
//---------------------
//作者:NK_test
//來源:CSDN
//原文:https://blog.csdn.net/NK_test/article/details/49866113
2. 修改
2.1 思路
2.2 程式碼
server.c
/* Server */
#include <stdlib.h>
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#define MAX 512
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
struct msgbuf
{
long mtype;
char mtext[1];
};
void echo_ser(int pmsgid)
{
int nrcv;
struct msgbuf *pmsg_snd;
struct msgbuf *pmsg_rcv;
char text[MAX];
while (1)
{
pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0);
if (nrcv == -1)
{
ERR_EXIT("pmsg_rcv error");
}
printf("Client: %s", pmsg_rcv->mtext);
free(pmsg_rcv);
strcpy(text,"Received. ");
pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text));
pmsg_snd->mtype = 2;
strcpy(pmsg_snd->mtext, text);
nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT);
if (nrcv == -1)
ERR_EXIT("pmsg_snd error");
free(pmsg_snd);
}
}
int main(int argc, char *argv[])
{
struct msgbuf *msg;
int nrcv;
int pid;
int pmsgid;
//建立一個訊息佇列
int msgid;
msgid = msgget(1234, IPC_CREAT | 0666);
printf("%d\n",msgid);
if (msgid == -1)
ERR_EXIT("msgget error");
//忽略子程序退出的細節
signal(SIGCHLD, SIG_IGN);
while (1) //接收到新的連線訊息
{
msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
//接受連線, mtype=1
nrcv = msgrcv(msgid, msg, MAX, 1, 0);
if (nrcv == -1){
ERR_EXIT("msgrcv error");
}
pmsgid = *((int *)msg->mtext);
printf("\npmsgid = %d \n", pmsgid);
pid = fork();
if (pid == 0) //子程序
{
echo_ser(pmsgid);
}
}
return 0;
}
client.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#define MAX 512
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)
struct msgbuf
{
long mtype;
char mtext[1];
};
void echo_cli(int pmsgid)
{
int nrcv;
struct msgbuf *pmsg_snd;
struct msgbuf *pmsg_rcv;
char text[MAX];
printf("Enter some text: ");
while (fgets(text, MAX, stdin) != NULL) //客戶端輸入資訊
{
pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text));
pmsg_snd->mtype = 2;
strcpy(pmsg_snd->mtext, text);
nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT);
if (nrcv == -1)
ERR_EXIT("pmsg_snd error");
free(pmsg_snd);
pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX);
nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0);
if (nrcv == -1)
{
ERR_EXIT("pmsg_rcv error");
}
printf("Server: %s\nEnter some text:", pmsg_rcv->mtext);
free(pmsg_rcv);
}
}
int main(int argc, char *argv[])
{
int pmsgid;
int msgid;
int ret;
struct msgbuf *msg;
//建立一個私有訊息佇列
pmsgid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (pmsgid == -1)
ERR_EXIT("msgget error");
printf("message %d queue created!\n", pmsgid);
//將私有訊息佇列識別符號和具體資料通過共享的佇列傳送給Server
//開啟名為1234的訊息佇列
msgid = msgget(1234, 0);
if (msgid == -1)
ERR_EXIT("msgget error");
msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(pmsgid));
msg->mtype = 1;
*((int *)msg->mtext) = pmsgid;
printf("%d\n", *((int *)msg->mtext));
//傳送訊息
ret = msgsnd(msgid, msg, sizeof(pmsgid), IPC_NOWAIT);
if (ret == -1)
{
ERR_EXIT("msgsnd error");
}
free(msg); //傳送完畢,釋放記憶體
printf("msg send.\n");
echo_cli(pmsgid);
ret = msgctl(pmsgid, IPC_RMID, NULL);
if (ret == -1)
{
ERR_EXIT("del error");
}
return 0;
}