併發 執行緒池
阿新 • • 發佈:2019-01-12
#include <stdio.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <string.h> #include <unistd.h> #include <pthread.h> #define PORT 9998 int conm_socket; typedef struct _msg { char buf[256]; struct sockaddr_in client_addr; }Msg; Msg task[100]; // 任務 int front = 0; int rear = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥鎖 pthread_cond_t cond1; // 條件變數 // void handl_client(Msg *data) { int i; for (i = 0; i < strlen(data->buf)-1; i++) { data->buf[i] += 'A' - 'a'; } sendto(conm_socket, data->buf, strlen(data->buf), 0, (struct sockaddr *)&(data->client_addr), sizeof(data->client_addr));// 回 } // 監聽套接字 int init() { int conm_socket = socket(AF_INET, SOCK_DGRAM, 0); if(-1 == conm_socket) { perror("建立套接字失敗"); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; /* Internet地址族 */ addr.sin_port = htons(PORT); /* 埠號 */ addr.sin_addr.s_addr = htonl(INADDR_ANY); /* IP地址, 繫結本地的所有ip地址*/ int ret = bind(conm_socket, (const struct sockaddr *)&addr, sizeof(addr)); if(-1 == ret) { perror("繫結失敗"); return -1; } return conm_socket; } void *consume(void *v) { while (1) { Msg data; pthread_mutex_lock(&mutex); while (rear == front) { pthread_cond_wait(&cond1, &mutex); } front = (front+1)%20; data = task[front]; pthread_mutex_unlock(&mutex); handl_client(&data); } } void produce() { char buf[1024]; while (1) { struct sockaddr_in client_addr; socklen_t len = sizeof(client_addr); ssize_t ret = recvfrom(conm_socket, buf, 1023, 0, (struct sockaddr *)&client_addr, &len);// 收 buf[ret] = '\0'; printf ("[%s:%d] %s", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), buf); pthread_mutex_lock(&mutex); if ((rear+1)%20 == front) continue; rear = (rear+1)%20; strcpy(task[rear].buf, buf); task[rear].client_addr = client_addr; pthread_cond_broadcast(&cond1); // 廣播 pthread_mutex_unlock(&mutex); } } // 主函式:併發 執行緒池 int main(int argc, char **argv) { long i; pthread_t thread; for (i = 0; i < 8; i++) { pthread_create(&thread, NULL, consume, (void*)(i+1)); pthread_detach(thread); } pthread_cond_init(&cond1, NULL); conm_socket = init(); if (-1 == conm_socket) return; produce(); return 0; }