1. 程式人生 > 其它 >網路程式設計筆記(六)-標準IO、epoll、多執行緒

網路程式設計筆記(六)-標準IO、epoll、多執行緒

網路程式設計筆記(六)-標準IO、epoll、多執行緒

參考《TCP/IP 網路程式設計》15、16、17、18 章

套接字和標準 I/O

標準 I/O 函式的 2 個優點:

  • 良好的移植性(Portability)。
  • 利用緩衝提高效能。

使用 read 和 write 函式傳輸 400M 檔案的時間遠遠大於使用標準函式 fgets 和 fputs。

標準 I/O 函式的幾個缺陷:

  • 不容易進行雙向通訊。如果要同時進行讀寫操作,則應以 r+、w+、a+ 模式開啟。

  • 有時候可能頻繁呼叫 fflush。每次切換讀寫工作狀態都應該呼叫 fflush 函式。

  • 需要以 FILE 結構體指標形式返回檔案描述符;

利用 fdopen 函式將檔案描述符轉換成 FILE 結構體指標:

#include <stdio.h>  

/*
	fildes: 需要轉換的檔案描述符  
	mode: 將要建立的 FILE 結構體指標的模式資訊 
*/
// 成功時返回轉換的FILE結構體指標,失敗時返回 NULL  
FILE * fdopen(int fildes, const char *mode);  

利用 fileno 函式將 FILE 結構體指標轉換成檔案描述符

#include <stdio.h>  

// 成功時返回轉換後的檔案描述符,失敗時返回-1  
int fileno(FILE * stream);  

關於 I/O 分離流的其他內容

I/O 分離流的方式

I/O 流分離的方式:

  • TCP I/O 過程(routine)分離:通過呼叫 fork 函式複製出一個檔案描述符,以區分輸入和輸出中使用的檔案描述符,雖然檔案描述符不會根據輸入和輸出進行區分,但是我們分開了 2 個檔案描述符的用途。
  • 呼叫 2 次呼叫 fdopen:建立讀模式 FILE 指標和寫模式 FILE 指標,分離了輸入工具和輸出工具。

TCP I/O 過程(routine)分離流的好處:

  • 通過分開輸入過程(程式碼)和輸出過程降低實現難度。

  • 與輸入無關的輸出操作可以提高速度。

2 次呼叫 fdopen 分離流的目的

  • 為了將 FILE 指標按讀模式、寫模式加以區分。

  • 可以通過區分讀寫模式降低難度。

  • 通過區分 I/O 緩衝提高緩衝效能。

流分離帶來的問題

流分離帶來的問題:終止流時無法半關閉。讀模式 FILE 指標、寫模式 FILE 指標都是基於同一個檔案描述符建立的。因此,針對任意一個 FILE 指標呼叫 fclose 都將關閉檔案描述符,終止套接字

解決方法:在建立 FILE 指標前複製檔案描述符,然後利用各自的檔案描述符建立讀模式 FILE 指標和寫模式 FILE 指標,這樣銷燬所有檔案描述符後才能銷燬套接字(引用計數)。

檔案描述符的複製

dup 和 dup2 函式

#include <unistd.h>  
  
int dup(int fildes)  
    
// 成功時返回複製的檔案描述符,失敗時返回-1
/*
	fildes:需要複製的檔案描述符
	filders2:明確指定的檔案描述符整數值
*/
int dup2(int fildes, int fildes2)  

無論複製出多少檔案描述符,均應呼叫 shutdown 函式傳送 EOF 並進入半關閉狀態。

優於 select 的 epoll

select 和 epoll 的比較

select 不適合以 web 伺服器端開發為主流的現代開發環境,所以要學習 Linux 平臺下的 epoll。

基於 select 的 I/O 複用技術速度慢的原因:

  • 呼叫 select 函式後常見的針對所有檔案描述符的迴圈語句

  • 每次呼叫 select 函式時都需要向函式傳遞監視物件資訊。

select 的優點:

  • 伺服器端接入者少。

  • 程式具有相容性——大部分作業系統都支援 select 函式

epoll 函式

epoll_create 函式

epoll_create:向作業系統請求建立儲存 epoll 檔案描述符的空間,對應 select 方式下宣告的 fd_set 變數。該函式返回的檔案描述符主要用於區分 epoll 例程。

#include <sys/epoll.h>  

// size: epoll例項的大小
// 返回值:成功時返回epoll檔案描述符,失敗時返回-1  
int epoll_create(int size)  

注意,size 只是建議 epoll 例程大小,僅供作業系統參考。Linux 2.6.8 之後的核心將完全忽略 size 引數,因為核心會根據情況調整 epoll 例程的大小。

epoll_ctl 函式

epoll_ctl:向空間(對應於 select 中的位陣列)註冊並登出檔案描述符,對應 select 中的 FD_SET、FD_CLR 函式。

#include <sys/epoll.h>

/*
	epfd:	用於註冊監視物件的epoll例程的檔案描述符。 
	op:		用於指定監視物件的新增、刪除或更改。
	fd:		需要註冊的監視物件檔案描述符。
	event:	監視物件事件型別。
*/
// 成功時返回0,失敗時返回-1 
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

// 用法:
struct epoll_event event;
event.events = EPOLLIN;		// 詳見結構體 epoll_event
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);  

舉例:

  • epoll_ctl(A, EPOLL_CTL_ADD, B, C); “epoll 例程中 A 註冊檔案描述符 B,主要目的是監視引數 C 中的事件。
  • epoll_ctl(A, EPOLL_CTL_DEL, B, NULL); ”從 epoll 例程 A 中刪除檔案描述符 B。“

op 操作:

  • EPOLL_CTL_ADD:將檔案描述符註冊到 epoll 例程。
  • EPOLL_CTL_DEL:從 epoll 例程中刪除檔案描述符。使用時需向第四個引數傳遞 NULL。
  • EPOLL_CTL_MOD:更改註冊的檔案描述符的關注事件發生情況。

epoll_wait 函式

epoll_wait:等待檔案描述符發生變化,類似 select 函式。

#include <sys/epoll.h>
/*
    epfd:		標識事件發生監視範圍epoll例程的檔案描述符。
    events:		儲存發生事件的檔案描述符集合的結構體地址值。
    maxevents:	第二個引數中可以儲存的最大事件數。 
    timeout:	以毫秒為單位的等待時間,傳遞-1時,一直等待直到發生事件。
*/
// 返回值:成功時返回發生事件的檔案描述符數,同時在第二個引數指向的緩衝中儲存發生事件的檔案描述符集合。失敗時返回-1  
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)  

結構體 epoll_event 和 epoll_data

select 函式中通過 fd_set 變數檢視事件發生與否,epoll 通過以下結構體將發生事件的檔案描述符集中到一起:

//epoll將發生事件的檔案描述符集中在一起,放在epoll_event結構體中  
struct epoll_event{  
    __uint32_t events;  
    epoll_data_t data;  
}

typedef union epoll_data{  
    void * ptr;  
    int fd;  
    __uint32_t u32;  
    __uint64_t u64;  
} epoll_data_t;  

宣告足夠大的 epoll_event 結構體陣列後,傳遞給 epoll_wait 函式時,發生變化的檔案描述符資訊將被填入該陣列,無需像 select 一樣對所有檔案描述符進行迴圈。

epoll_event 成員 epoll_events 中可以儲存的常量及所指的事件型別:

基於 epoll 的回聲伺服器端

注意與 select 進行對比。

// echo_epollserv.c

#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>

#define BUF_SIZE 100
#define EPOLL_SIZE 50
void error_handling(char *message);

int main(int argc, char *argv[]) {
  int serv_sock, clnt_sock;
  struct sockaddr_in serv_adr, clnt_adr;
  socklen_t adr_sz;
  int str_len, i;
  char buf[BUF_SIZE];
  
  struct epoll_event *ep_events;
  struct epoll_event event;
  int epfd, event_cnt;

  if (argc != 2) {
    printf("Usage : %s <port>\n", argv[0]);
    exit(1);
  }

  serv_sock = socket(PF_INET, SOCK_STREAM, 0);
  if (serv_sock == -1) {
    error_handling("socket error");
  }

  memset(&serv_adr, 0, sizeof(serv_adr));
  serv_adr.sin_family = AF_INET;
  serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
  serv_adr.sin_port = htons(atoi(argv[1]));

  if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
    error_handling("bind() error");
  }

  if (listen(serv_sock, 5) == -1) {
    error_handling("listen() error");
  }

  epfd = epoll_create(EPOLL_SIZE);  //建立epoll例程
  ep_events = malloc(sizeof(struct epoll_event) * EPOLL_SIZE);
  
  // 將serv_sock新增到例程空間中
  event.events = EPOLLIN; 
  event.data.fd = serv_sock;
  epoll_ctl(epfd, EPOLL_CTL_ADD, serv_sock, &event);  

  while (1) {
    event_cnt = epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);

    if (event_cnt == -1) {
      puts("epoll_wait() error!");
      break;
    }

    for (i = 0; i < event_cnt; i++) {
      if (ep_events[i].data.fd == serv_sock) {
        adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &adr_sz);
        
        event.events = EPOLLIN;
        event.data.fd = clnt_sock;
        epoll_ctl(epfd, EPOLL_CTL_ADD, clnt_sock, &event);  //將請求連線的套接字新增到epoll例程中
        printf("connected client : %d \n", clnt_sock);
      } else {  // read message
        str_len = read(ep_events[i].data.fd, buf, BUF_SIZE);
        if (str_len == 0) {  // close request!
          epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL); // 將斷開連線的套接字從epoll例程中移除
          close(ep_events[i].data.fd);
          printf("closed client %d \n", ep_events[i].data.fd);
        } else {
          write(ep_events[i].data.fd, buf, str_len);  // echo!
        }
      }
    }
  }

  close(serv_sock);
  close(epfd);    // 注意關閉epoll例程
  return 0;
}

void error_handling(char *message) {
  fputs(message, stderr);
  fputs("\n", stderr);
  exit(1);
}

邊緣觸發和條件觸發

邊緣觸發和條件觸發的區別——在於發生事件的時間點:

  • 條件觸發:只要輸入緩衝有資料,就會一直通知該事件。
  • 邊緣觸發:輸入緩衝收到資料時,僅註冊一次事件。

epoll 預設以條件觸發方式工作,select 也是以條件觸發方式工作的。

實現邊緣觸發伺服器的原理:通過 errno 變數驗證錯誤原因;使用 fcntl 函式更改套接字選項,新增非阻塞 O_NONBLOCK 表至,完成非阻塞 I/O。

void setnonblockingmode(int fd){  
	int flag = fcntl(fd, F_GETFL, 0);
	fcntl(fd, F_SETFL, flag|O_NONBLOCK);  
}  

邊緣觸發的優點:可以分離接收資料和處理資料的時間點。

多執行緒伺服器端的實現

pthread_create 和 pthread_join

  1. pthread_create
    功能:生成執行緒
    必須在呼叫此函式前就為 pthread_t 物件分配記憶體空間(malloc)
    thread_handles = malloc(thread_countsizeof(pthread_t));
    技巧:為每一個執行緒賦予唯一的 int 型引數 rank
    pthread_create(&thread_handles[thread], NULL, Hello, (void) thread);
    thread_p 是指標!

  2. pthread_join
    分支合併(join)到主執行緒的直線中
    pthread_join(thread_handles[thread], NULL);
    thread_p 是值!

    類似的函式有 int pthread_detach(pthread_t thread); 呼叫 pthread_detach 不會引起執行緒終止或進入阻塞狀態。

臨界區

  1. 臨界區
    例項:計算 pi 的執行緒函式
    競爭條件
    當多個執行緒都要訪問共享變數或共享檔案這樣的共享資源時,如果其中一個訪問是更新操作,那麼這些訪問就可能會導致某種錯誤
    臨界區
    一個更新共享資源的程式碼段,一次只允許一個執行緒執行該程式碼段
    必須序列執行臨界區的程式碼

  2. 互斥量(mutex)
    特殊型別 pthread_mutex_t
    初始化和銷燬
    pthread_mutex_int
    pthread_mutex_init(&mutex, NULL);
    pthread_mutex_destroy
    pthread_mutex_destroy(&mutex);
    上鎖和解鎖
    pthread_mutex_lock
    pthread_mutex_lock(&mutex);
    pthread_mutex_unlock
    pthread_mutex_unlock(&mutex);
    使用互斥量時,多個程序進入臨界區的順序是隨機的

  3. 訊號量(semaphore)
    訊號量定義:一種特殊型別的 unsigned int 無符號整型變數,可以賦值為 0、1、2……
    優點
    能夠初始化為任何非負值
    訊號量沒有歸屬權,任何執行緒都能夠對鎖上的訊號量進行解鎖
    生產者-消費者同步
    一個消費者執行緒在繼續執行前,需要等待一些條件或資料被生產者執行緒建立
    基本操作
    引入 semaphore 標頭檔案
    和 pthread_t 一樣,init 前需要分配記憶體
    semaphores = malloc(thread_count*sizeof(sem_t));
    初始化和銷燬
    sem_init
    sem_destroy
    P 操作和 V 操作
    sem_wait
    將訊號量減 1
    如果值變成 負數,則阻塞執行 P 操作的執行緒,否則執行緒繼續執行
    sem_post
    將訊號量加 1
    如果值 小於等於零,則喚醒一個等待程序

多執行緒併發伺服器端的實現(聊天程式)

伺服器:可以同時連線多個客戶

#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void *handle_clnt(void *arg);
void send_msg(char *msg, int len);
void error_handling(char *message);

int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[]) {
  int serv_sock;
  int clnt_sock;
  struct sockaddr_in serv_adr;
  struct sockaddr_in clnt_adr;
  int clnt_adr_sz;
  pthread_t t_id;       // 傳遞給執行緒的id

  if (argc != 2) {
    printf("Usage : %s <port>\n", argv[0]);
    exit(1);
  }

  pthread_mutex_init(
      &mutx, NULL);  // 互斥訪問臨界區——clnt_cnt和clnt_socks
  serv_sock = socket(PF_INET, SOCK_STREAM, 0);
  if (serv_sock == -1) {
    error_handling("socket() error");
  }

  memset(&serv_adr, 0, sizeof(serv_adr));
  serv_adr.sin_family = AF_INET;
  serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
  serv_adr.sin_port = htons(atoi(argv[1]));

  if (bind(serv_sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
    error_handling("bind() error");
  }

  if (listen(serv_sock, 5) == -1) {
    error_handling("listen() error");
  }

  while (1) {
    clnt_adr_sz = sizeof(clnt_adr);
    clnt_sock = accept(serv_sock, (struct sockaddr *)&clnt_adr, &clnt_adr_sz);
    // 以下程式碼訪問臨界區,將相關資訊寫入clnt_cnt和clnt_socks。
    pthread_mutex_lock(&mutx);
    clnt_socks[clnt_cnt++] = clnt_sock;
    pthread_mutex_unlock(&mutx);

    pthread_create(&t_id, NULL, handle_clnt, (void *)&clnt_sock);
    pthread_detach(t_id);   // 從記憶體中銷燬完全已終止的執行緒
    printf("Connetcted client IP :%s \n", inet_ntoa(clnt_adr.sin_addr));
  }

  close(serv_sock);

  return 0;
}

void *handle_clnt(void *arg) {
  int clnt_sock = *((int *)arg);  // 傳遞給執行緒的引數
  int str_len = 0;
  int i;
  char msg[BUF_SIZE];

  while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
    send_msg(msg, str_len);

  pthread_mutex_lock(&mutx);  // 訪問臨界區前獲得鎖
  for (i = 0; i < clnt_cnt; i++) {
    if (clnt_sock == clnt_socks[i]) { // 覆蓋刪除
      while (i++ < clnt_cnt - 1) clnt_socks[i] = clnt_socks[i + 1];
      break;
    }
  }

  clnt_cnt--;
  pthread_mutex_unlock(&mutx);
  close(clnt_sock);

  return NULL;
}

// send to all
void send_msg(char *msg, int len) {
  int i;

  pthread_mutex_lock(&mutx);
  for (i = 0; i < clnt_cnt; i++) {
    write(clnt_socks[i], msg, len);
  }

  pthread_mutex_unlock(&mutx);
}

void error_handling(char *message) {
  fputs(message, stderr);
  fputs("\n", stderr);
  exit(1);
}

客戶端:分離輸入和輸出而建立執行緒

#include <arpa/inet.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *message);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, char *argv[]) {
  int sock;
  struct sockaddr_in serv_adr;
  pthread_t snd_thread, rcv_thread;
  void *thread_return;

  if (argc != 4) {
    printf("Usage : %s <IP> <port> <name>\n", argv[0]);
    exit(1);
  }

  sprintf(name, "[%s]", argv[3]);
  sock = socket(PF_INET, SOCK_STREAM, 0);
  if (sock == -1) {
    error_handling("socket() error");
  }

  memset(&serv_adr, 0, sizeof(serv_adr));
  serv_adr.sin_family = AF_INET;
  serv_adr.sin_addr.s_addr = inet_addr(argv[1]);
  serv_adr.sin_port = htons(atoi(argv[2]));

  if (connect(sock, (struct sockaddr *)&serv_adr, sizeof(serv_adr)) == -1) {
    error_handling("connect() error\r\n");
  } else {
    printf("connect to the server!\n");
  }

  pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
  pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
  pthread_join(snd_thread, &thread_return);
  pthread_join(rcv_thread, &thread_return);

  close(sock);
  return 0;
}

void error_handling(char *message) {
  fputs(message, stderr);
  fputs("\n", stderr);
  exit(1);
}

void *send_msg(void *arg) {
  int sock = *((int *)arg);
  char name_msg[NAME_SIZE + BUF_SIZE];

  while (1) {
    fgets(msg, BUF_SIZE, stdin);
    if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) {
      close(sock);
      exit(0);
    }
    sprintf(name_msg, "%s %s", name, msg);
    write(sock, name_msg, strlen(name_msg));
  }

  return NULL;
}

void *recv_msg(void *arg) {
  int sock = *((int *)arg);
  char name_msg[NAME_SIZE + BUF_SIZE];
  int str_len;

  while (1) {
    str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
    if (str_len == -1) {
      return (void *)-1;
    }
    name_msg[str_len] = 0;
    fputs(name_msg, stdout);
  }

  return NULL;
}

參考資料

https://github.com/chankeh/net-lenrning-reference