網路程式設計筆記(六)-標準IO、epoll、多執行緒
網路程式設計筆記(六)-標準IO、epoll、多執行緒
參考《TCP/IP 網路程式設計》15、16、17、18 章
套接字和標準 I/O
標準 I/O 函式的 2 個優點:
- 良好的移植性(Portability)。
- 利用緩衝提高效能。
使用 read 和 write 函式傳輸 400M 檔案的時間遠遠大於使用標準函式 fgets 和 fputs。
標準 I/O 函式的幾個缺陷:
-
不容易進行雙向通訊。如果要同時進行讀寫操作,則應以 r+、w+、a+ 模式開啟。
-
有時候可能頻繁呼叫 fflush。每次切換讀寫工作狀態都應該呼叫 fflush 函式。
-
需要以 FILE 結構體指標形式返回檔案描述符;
利用 fdopen 函式將檔案描述符轉換成 FILE 結構體指標:
#include <stdio.h>
/*
fildes: 需要轉換的檔案描述符
mode: 將要建立的 FILE 結構體指標的模式資訊
*/
// 成功時返回轉換的FILE結構體指標,失敗時返回 NULL
FILE * fdopen(int fildes, const char *mode);
利用 fileno 函式將 FILE 結構體指標轉換成檔案描述符
#include <stdio.h> // 成功時返回轉換後的檔案描述符,失敗時返回-1 int fileno(FILE * stream);
關於 I/O 分離流的其他內容
I/O 分離流的方式
I/O 流分離的方式:
- TCP I/O 過程(routine)分離:通過呼叫 fork 函式複製出一個檔案描述符,以區分輸入和輸出中使用的檔案描述符,雖然檔案描述符不會根據輸入和輸出進行區分,但是我們分開了 2 個檔案描述符的用途。
- 呼叫 2 次呼叫 fdopen:建立讀模式 FILE 指標和寫模式 FILE 指標,分離了輸入工具和輸出工具。
TCP I/O 過程(routine)分離流的好處:
-
通過分開輸入過程(程式碼)和輸出過程降低實現難度。
-
與輸入無關的輸出操作可以提高速度。
2 次呼叫 fdopen 分離流的目的:
-
為了將 FILE 指標按讀模式、寫模式加以區分。
-
可以通過區分讀寫模式降低難度。
-
通過區分 I/O 緩衝提高緩衝效能。
流分離帶來的問題
流分離帶來的問題:終止流時無法半關閉。讀模式 FILE 指標、寫模式 FILE 指標都是基於同一個檔案描述符建立的。因此,針對任意一個 FILE 指標呼叫 fclose 都將關閉檔案描述符,終止套接字。
解決方法:在建立 FILE 指標前複製檔案描述符,然後利用各自的檔案描述符建立讀模式 FILE 指標和寫模式 FILE 指標,這樣銷燬所有檔案描述符後才能銷燬套接字(引用計數)。
檔案描述符的複製
dup 和 dup2 函式
#include <unistd.h>
int dup(int fildes)
// 成功時返回複製的檔案描述符,失敗時返回-1
/*
fildes:需要複製的檔案描述符
filders2:明確指定的檔案描述符整數值
*/
int dup2(int fildes, int fildes2)
無論複製出多少檔案描述符,均應呼叫 shutdown 函式傳送 EOF 並進入半關閉狀態。
優於 select 的 epoll
select 和 epoll 的比較
select 不適合以 web 伺服器端開發為主流的現代開發環境,所以要學習 Linux 平臺下的 epoll。
基於 select 的 I/O 複用技術速度慢的原因:
-
呼叫 select 函式後常見的針對所有檔案描述符的迴圈語句。
-
每次呼叫 select 函式時都需要向函式傳遞監視物件資訊。
select 的優點:
-
伺服器端接入者少。
-
程式具有相容性——大部分作業系統都支援 select 函式。
epoll 函式
epoll_create 函式
epoll_create:向作業系統請求建立儲存 epoll 檔案描述符的空間,對應 select 方式下宣告的 fd_set 變數。該函式返回的檔案描述符主要用於區分 epoll 例程。
#include <sys/epoll.h>
// size: epoll例項的大小
// 返回值:成功時返回epoll檔案描述符,失敗時返回-1
int epoll_create(int size)
注意,size 只是建議 epoll 例程大小,僅供作業系統參考。Linux 2.6.8 之後的核心將完全忽略 size 引數,因為核心會根據情況調整 epoll 例程的大小。
epoll_ctl 函式
epoll_ctl:向空間(對應於 select 中的位陣列)註冊並登出檔案描述符,對應 select 中的 FD_SET、FD_CLR 函式。
#include <sys/epoll.h>
/*
epfd: 用於註冊監視物件的epoll例程的檔案描述符。
op: 用於指定監視物件的新增、刪除或更改。
fd: 需要註冊的監視物件檔案描述符。
event: 監視物件事件型別。
*/
// 成功時返回0,失敗時返回-1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
// 用法:
struct epoll_event event;
event.events = EPOLLIN; // 詳見結構體 epoll_event
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
舉例:
epoll_ctl(A, EPOLL_CTL_ADD, B, C);
“epoll 例程中 A 註冊檔案描述符 B,主要目的是監視引數 C 中的事件。epoll_ctl(A, EPOLL_CTL_DEL, B, NULL);
”從 epoll 例程 A 中刪除檔案描述符 B。“
op 操作:
- EPOLL_CTL_ADD:將檔案描述符註冊到 epoll 例程。
- EPOLL_CTL_DEL:從 epoll 例程中刪除檔案描述符。使用時需向第四個引數傳遞 NULL。
- EPOLL_CTL_MOD:更改註冊的檔案描述符的關注事件發生情況。
epoll_wait 函式
epoll_wait:等待檔案描述符發生變化,類似 select 函式。
#include <sys/epoll.h>
/*
epfd: 標識事件發生監視範圍epoll例程的檔案描述符。
events: 儲存發生事件的檔案描述符集合的結構體地址值。
maxevents: 第二個引數中可以儲存的最大事件數。
timeout: 以毫秒為單位的等待時間,傳遞-1時,一直等待直到發生事件。
*/
// 返回值:成功時返回發生事件的檔案描述符數,同時在第二個引數指向的緩衝中儲存發生事件的檔案描述符集合。失敗時返回-1
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
結構體 epoll_event 和 epoll_data
select 函式中通過 fd_set 變數檢視事件發生與否,epoll 通過以下結構體將發生事件的檔案描述符集中到一起:
//epoll將發生事件的檔案描述符集中在一起,放在epoll_event結構體中
struct epoll_event{
__uint32_t events;
epoll_data_t data;
}
typedef union epoll_data{
void * ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
宣告足夠大的 epoll_event 結構體陣列後,傳遞給 epoll_wait 函式時,發生變化的檔案描述符資訊將被填入該陣列,無需像 select 一樣對所有檔案描述符進行迴圈。
epoll_event 成員 epoll_events 中可以儲存的常量及所指的事件型別:
基於 epoll 的回聲伺服器端
注意與 select 進行對比。
// echo_epollserv.c
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char *message);
int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t adr_sz;
int str_len, i;
char buf[BUF_SIZE];
struct epoll_event *ep_events;
struct epoll_event event;
int epfd, event_cnt;
if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) {
error_handling("socket error");
}
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
error_handling("bind() error");
}
if (listen(serv_sock, 5) == -1) {
error_handling("listen() error");
}
epfd = epoll_create(EPOLL_SIZE); //建立epoll例程
ep_events = malloc(sizeof(struct epoll_event) * EPOLL_SIZE);
// 將serv_sock新增到例程空間中
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);
while (1) {
event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
if (event_cnt == -1) {
puts("epoll_wait() error!");
break;
}
for (i = 0; i < event_cnt; i++) {
if (ep_events[i].data.fd == serv_sock) {
adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &adr_sz);
event.events = EPOLLIN;
event.data.fd = clnt_sock;
epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event); //將請求連線的套接字新增到epoll例程中
printf("connected client : %d \n", clnt_sock);
} else { // read message
str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
if (str_len == 0) { // close request!
epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); // 將斷開連線的套接字從epoll例程中移除
close(ep_events[i].data.fd);
printf("closed client %d \n", ep_events[i].data.fd);
} else {
write(ep_events[i].data.fd, buf, str_len); // echo!
}
}
}
}
close(serv_sock);
close(epfd); // 注意關閉epoll例程
return 0;
}
void error_handling(char *message) {
fputs(message, stderr);
fputs("\n", stderr);
exit(1);
}
邊緣觸發和條件觸發
邊緣觸發和條件觸發的區別——在於發生事件的時間點:
- 條件觸發:只要輸入緩衝有資料,就會一直通知該事件。
- 邊緣觸發:輸入緩衝收到資料時,僅註冊一次事件。
epoll 預設以條件觸發方式工作,select 也是以條件觸發方式工作的。
實現邊緣觸發伺服器的原理:通過 errno 變數驗證錯誤原因;使用 fcntl 函式更改套接字選項,新增非阻塞 O_NONBLOCK 表至,完成非阻塞 I/O。
void setnonblockingmode(int fd){
int flag = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flag|O_NONBLOCK);
}
邊緣觸發的優點:可以分離接收資料和處理資料的時間點。
多執行緒伺服器端的實現
pthread_create 和 pthread_join
-
pthread_create
功能:生成執行緒
必須在呼叫此函式前就為 pthread_t 物件分配記憶體空間(malloc)
thread_handles = malloc(thread_countsizeof(pthread_t));
技巧:為每一個執行緒賦予唯一的 int 型引數 rank
pthread_create(&thread_handles[thread], NULL, Hello, (void) thread);
thread_p 是指標! -
pthread_join
分支合併(join)到主執行緒的直線中
pthread_join(thread_handles[thread], NULL);
thread_p 是值!類似的函式有 int pthread_detach(pthread_t thread); 呼叫 pthread_detach 不會引起執行緒終止或進入阻塞狀態。
臨界區
-
臨界區
例項:計算 pi 的執行緒函式
競爭條件
當多個執行緒都要訪問共享變數或共享檔案這樣的共享資源時,如果其中一個訪問是更新操作,那麼這些訪問就可能會導致某種錯誤
臨界區
一個更新共享資源的程式碼段,一次只允許一個執行緒執行該程式碼段
必須序列執行臨界區的程式碼 -
互斥量(mutex)
特殊型別 pthread_mutex_t
初始化和銷燬
pthread_mutex_int
pthread_mutex_init(&mutex, NULL);
pthread_mutex_destroy
pthread_mutex_destroy(&mutex);
上鎖和解鎖
pthread_mutex_lock
pthread_mutex_lock(&mutex);
pthread_mutex_unlock
pthread_mutex_unlock(&mutex);
使用互斥量時,多個程序進入臨界區的順序是隨機的 -
訊號量(semaphore)
訊號量定義:一種特殊型別的 unsigned int 無符號整型變數,可以賦值為 0、1、2……
優點
能夠初始化為任何非負值
訊號量沒有歸屬權,任何執行緒都能夠對鎖上的訊號量進行解鎖
生產者-消費者同步
一個消費者執行緒在繼續執行前,需要等待一些條件或資料被生產者執行緒建立
基本操作
引入 semaphore 標頭檔案
和 pthread_t 一樣,init 前需要分配記憶體
semaphores = malloc(thread_count*sizeof(sem_t));
初始化和銷燬
sem_init
sem_destroy
P 操作和 V 操作
sem_wait
將訊號量減 1
如果值變成 負數,則阻塞執行 P 操作的執行緒,否則執行緒繼續執行
sem_post
將訊號量加 1
如果值 小於等於零,則喚醒一個等待程序
多執行緒併發伺服器端的實現(聊天程式)
伺服器:可以同時連線多個客戶
#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
#define BUF_SIZE 100
#define MAX_CLNT 256
void *handle_clnt(void *arg);
void send_msg(char *msg, int len);
void error_handling(char *message);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;
int main(int argc, char *argv[]) {
int serv_sock;
int clnt_sock;
struct sockaddr_in serv_adr;
struct sockaddr_in clnt_adr;
int clnt_adr_sz;
pthread_t t_id; // 傳遞給執行緒的id
if (argc != 2) {
printf("Usage : %s <port>\n", argv[0]);
exit(1);
}
pthread_mutex_init(
&mutx, NULL); // 互斥訪問臨界區——clnt_cnt和clnt_socks
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1) {
error_handling("socket() error");
}
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
error_handling("bind() error");
}
if (listen(serv_sock, 5) == -1) {
error_handling("listen() error");
}
while (1) {
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &clnt_adr_sz);
// 以下程式碼訪問臨界區,將相關資訊寫入clnt_cnt和clnt_socks。
pthread_mutex_lock(&mutx);
clnt_socks[clnt_cnt++] = clnt_sock;
pthread_mutex_unlock(&mutx);
pthread_create(&t_id, NULL, handle_clnt, (void *)&clnt_sock);
pthread_detach(t_id); // 從記憶體中銷燬完全已終止的執行緒
printf("Connetcted client IP :%s \n", inet_ntoa(clnt_adr.sin_addr));
}
close(serv_sock);
return 0;
}
void *handle_clnt(void *arg) {
int clnt_sock = *((int *)arg); // 傳遞給執行緒的引數
int str_len = 0;
int i;
char msg[BUF_SIZE];
while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
send_msg(msg, str_len);
pthread_mutex_lock(&mutx); // 訪問臨界區前獲得鎖
for (i = 0; i < clnt_cnt; i++) {
if (clnt_sock == clnt_socks[i]) { // 覆蓋刪除
while (i++ < clnt_cnt - 1) clnt_socks[i] = clnt_socks[i + 1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mutx);
close(clnt_sock);
return NULL;
}
// send to all
void send_msg(char *msg, int len) {
int i;
pthread_mutex_lock(&mutx);
for (i = 0; i < clnt_cnt; i++) {
write(clnt_socks[i], msg, len);
}
pthread_mutex_unlock(&mutx);
}
void error_handling(char *message) {
fputs(message, stderr);
fputs("\n", stderr);
exit(1);
}
客戶端:分離輸入和輸出而建立執行緒
#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#define BUF_SIZE 100
#define NAME_SIZE 20
void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *message);
char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];
int main(int argc, char *argv[]) {
int sock;
struct sockaddr_in serv_adr;
pthread_t snd_thread, rcv_thread;
void *thread_return;
if (argc != 4) {
printf("Usage : %s <IP> <port> <name>\n", argv[0]);
exit(1);
}
sprintf(name, "[%s]", argv[3]);
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1) {
error_handling("socket() error");
}
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = inet_addr(argv[1]);
serv_adr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
error_handling("connect() error\r\n");
} else {
printf("connect to the server!\n");
}
pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
pthread_join(snd_thread, &thread_return);
pthread_join(rcv_thread, &thread_return);
close(sock);
return 0;
}
void error_handling(char *message) {
fputs(message, stderr);
fputs("\n", stderr);
exit(1);
}
void *send_msg(void *arg) {
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
while (1) {
fgets(msg, BUF_SIZE, stdin);
if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) {
close(sock);
exit(0);
}
sprintf(name_msg, "%s %s", name, msg);
write(sock, name_msg, strlen(name_msg));
}
return NULL;
}
void *recv_msg(void *arg) {
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
int str_len;
while (1) {
str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
if (str_len == -1) {
return (void *)-1;
}
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
return NULL;
}