實現訊號量(三) 訊息佇列實現訊號量
msg_sem.hpp 檔案
msg_sem.cpp 檔案#ifndef MSG_SEM_HPP #define MSG_SEM_HPP #include<errno.h> #include <sys/ipc.h> #include <sys/msg.h> #include <unistd.h> typedef struct msg_sem_tag { int fd; int valid; }msg_sem_t; #define MSG_SEM_VALID 0xae3c int msg_sem_init(msg_sem_t* msg, int num); int msg_sem_p(msg_sem_t* msg); int msg_sem_tryP(msg_sem_t* msg); inline int msg_sem_v(msg_sem_t* msg) { if( msg == NULL || msg->valid != MSG_SEM_VALID ) return EINVAL; return msgsnd(msg->fd, "v", 1, 0); } inline int msg_sem_destroy(msg_sem_t* msg) { if( msg == NULL || msg->valid != MSG_SEM_VALID ) return EINVAL; return msgctl(msg->fd, IPC_RMID, NULL); } #endif // MSG_SEM_HPP
#include"msg_sem.hpp" //同樣,本函式也不是執行緒安全的。 int msg_sem_init(msg_sem_t* msg, int num) { int status; if( msg == NULL || num < 0) return EINVAL; status = msgget(IPC_PRIVATE, 0600 | IPC_CREAT ); if( status == -1 ) return errno; msg->fd = status; while( num-- ) { status = msgsnd(msg->fd, "v", 1, 0); if( status == -1 ) goto error; } msg->valid = MSG_SEM_VALID; return 0; error: msgctl(msg->fd, IPC_RMID, NULL); return status; } int msg_sem_p(msg_sem_t* msg) { static char ch; int status; if( msg == NULL || msg->valid != MSG_SEM_VALID ) return EINVAL; while( (status = msgrcv(msg->fd, &ch, 1, 0, 0)) == -1 ) { if( errno == EINTR ) continue; else break; } if( status == 0 ) return 0; else return errno; } //由於訊息佇列可以通過引數來選擇是不是在讀取的時候阻塞,所以無需像管道那樣 //修改檔案描述符的狀態 int msg_sem_tryP(msg_sem_t* msg) { static char ch; int status; if( msg == NULL || msg->valid != MSG_SEM_VALID ) return EINVAL; status = msgrcv(msg->fd, &ch, 1, 0, IPC_NOWAIT); if( status == -1 && errno == ENOMSG ) return EAGAIN; return status; }
#include "msg_sem.hpp" #include"Thread.hpp" #include <stdio.h> #include <string.h> #include <sys/types.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #define NBUFF 8 #define BUFFSIZE 4096 struct { /* data shared by producer and consumer */ struct { char data[BUFFSIZE]; /* a buffer */ ssize_t n; /* count of #bytes in the buffer */ } buff[NBUFF]; /* NBUFF of these buffers/counts */ msg_sem_t nempty, nfull; /* semaphores, not pointers */ msg_sem_t writer_mutex, reader_mutex; } shared; int writer_index = 0, reader_index = 0; int fd; /* input file to copy to stdout */ void* produce(void *), *consume(void *); void* produce_tryP(void *arg); int main(int argc, char **argv) { Thread_t tid_produce1, tid_produce2, tid_produce3; Thread_t tid_consume1, tid_consume2; if (argc != 2) { printf("use <pathname> as pramater \n"); exit(1); } fd = open(argv[1], O_RDONLY); if( fd == -1 ) { printf("cann't open the file\n"); return -1; } msg_sem_init(&shared.writer_mutex, 1); msg_sem_init(&shared.reader_mutex, 1); msg_sem_init(&shared.nempty, NBUFF); msg_sem_init(&shared.nfull, 0); thread_init(&tid_produce1); thread_init(&tid_produce2); thread_init(&tid_produce3); thread_init(&tid_consume1); thread_init(&tid_consume2); thread_create(&tid_consume1, NULL, consume); thread_create(&tid_consume2, NULL, consume); thread_create(&tid_produce1, NULL, produce); thread_create(&tid_produce2, NULL, produce); thread_create(&tid_produce3, NULL, produce_tryP); thread_start(&tid_consume1, NULL); thread_start(&tid_consume2, NULL); thread_start(&tid_produce1, NULL); thread_start(&tid_produce2, NULL); thread_start(&tid_produce3, NULL); thread_join(&tid_consume1, NULL); thread_join(&tid_consume2, NULL); thread_join(&tid_produce1, NULL); thread_join(&tid_produce2, NULL); thread_join(&tid_produce3, NULL); thread_destroy(&tid_consume1); thread_destroy(&tid_consume2); thread_destroy(&tid_produce1); thread_destroy(&tid_produce2); thread_destroy(&tid_produce3); msg_sem_destroy(&shared.writer_mutex); msg_sem_destroy(&shared.reader_mutex); msg_sem_destroy(&shared.nempty); msg_sem_destroy(&shared.nfull); exit(0); } void *produce(void *arg) { while( 1 ) { msg_sem_p(&shared.nempty); /* wait for at least 1 empty slot */ msg_sem_p(&shared.writer_mutex); shared.buff[writer_index].n = read(fd, shared.buff[writer_index].data, BUFFSIZE); if( shared.buff[writer_index].n == 0 ) { msg_sem_v(&shared.nfull); msg_sem_v(&shared.writer_mutex); return NULL; } writer_index = (writer_index+1)%NBUFF; msg_sem_v(&shared.nfull); msg_sem_v(&shared.writer_mutex); } return NULL; } void* produce_tryP(void *arg) { int status; while( 1 ) { /* wait for at least 1 empty slot */ while( 1 ) { status = msg_sem_tryP(&shared.nempty); if( status == 0 ) break; else if( status == EAGAIN ) { usleep(10*1000); //sleep 10 毫秒 continue; } else return NULL; } msg_sem_p(&shared.writer_mutex); shared.buff[writer_index].n = read(fd, shared.buff[writer_index].data, BUFFSIZE); if( shared.buff[writer_index].n == 0 ) { msg_sem_v(&shared.nfull); msg_sem_v(&shared.writer_mutex); return NULL; } writer_index = (writer_index+1)%NBUFF; msg_sem_v(&shared.nfull); msg_sem_v(&shared.writer_mutex); } return NULL; } void* consume(void *arg) { while( 1 ) { msg_sem_p(&shared.nfull); msg_sem_p(&shared.reader_mutex); if( shared.buff[reader_index].n == 0) { msg_sem_v(&shared.nempty); msg_sem_v(&shared.reader_mutex); return NULL; } write(STDOUT_FILENO, shared.buff[reader_index].data, shared.buff[reader_index].n); reader_index = (reader_index+1)%NBUFF; msg_sem_v(&shared.nempty); msg_sem_v(&shared.reader_mutex); } return NULL; }
前一篇使用管道實現訊號量,本文使用訊息佇列實現訊號量。其原理和管道一樣,都是通過在訊息佇列裡面寫入一個字元,讀取一個字元。這裡就不再多說了,直接上程式碼。 msg_sem.hpp 檔案 #ifndef MSG_SEM_HPP #define
