1. 程式人生 > 實用技巧 >從0實現基於Linux socket聊天室-多執行緒伺服器模型-1

從0實現基於Linux socket聊天室-多執行緒伺服器模型-1

前言

Socket在實際系統程式開發當中,應用非常廣泛,也非常重要。實際應用中伺服器經常需要支援多個客戶端連線,實現高併發伺服器模型顯得尤為重要。高併發伺服器從簡單的迴圈伺服器模型處理少量網路併發請求,演進到解決C10K,C10M問題的高併發伺服器模型。

C/S架構

伺服器-客戶機,即Client-Server(C/S)結構。C/S結構通常採取兩層結構。伺服器負責資料的管理,客戶機負責完成與使用者的互動任務。

在C/S結構中,應用程式分為兩部分:伺服器部分和客戶機部分。伺服器部分是多個使用者共享的資訊與功能,執行後臺服務,如控制共享資料庫的操作等;客戶機部分為使用者所專有,負責執行前臺功能,在出錯提示、線上幫助等方面都有強大的功能,並且可以在子程式間自由切換。 如上圖所示:這是基於套接字實現客戶端和伺服器相連的函式呼叫關係,socket API資料比較多,本文不再過多敘述。

pthread執行緒庫:(POSIX)

pthread執行緒庫是Linux下比較常用的一個執行緒庫,關於他的用法和特性大家可以自行搜尋相關文章,下面只簡單介紹他的用法和編譯。

執行緒標識

執行緒有ID, 但不是系統唯一, 而是程序環境中唯一有效. 執行緒的控制代碼是pthread_t型別, 該型別不能作為整數處理, 而是一個結構. 下面介紹兩個函式:

標頭檔案:<pthread.h>
原型:intpthread_equal(pthread_ttid1,pthread_ttid2);
返回值:相等返回非0,不相等返回0.
說明:比較兩個執行緒ID是否相等.

標頭檔案:<pthread.h>
原型:pthread_t
pthread_self();
返回值:返回呼叫執行緒的執行緒ID.

執行緒建立

在執行中建立一個執行緒, 可以為該執行緒分配它需要做的工作(執行緒執行函式), 該執行緒共享程序的資源. 建立執行緒的函式pthread_create()

標頭檔案:<pthread.h>
原型:intpthread_create(pthread_t*restricttidp,constpthread_attr_t*restrictattr,void*(start_rtn)(void),void*restrictarg);
返回值:成功則返回0,否則返回錯誤編號.
引數:
tidp:指向新建立執行緒ID的變數,作為函式的輸出.
attr:用於定製各種不同的執行緒屬性,NULL
為預設屬性(見下).
start_rtn:函式指標,為執行緒開始執行的函式名.該函式可以返回一個void*型別的返回值,
而這個返回值也可以是其他型別,並由pthread_join()獲取
arg:函式的唯一無型別(void)指標引數,如要傳多個引數,可以用結構封裝.

編譯

因為pthread的庫不是linux系統的庫,所以在進行編譯的時候要加上-lpthread
#gccfilename-lpthread//預設情況下gcc使用c庫,要使用額外的庫要這樣選擇使用的庫

常見的網路伺服器模型

本文結合自己的理解,主要以TCP為例,總結了幾種常見的網路伺服器模型的實現方式,並最終實現一個簡單的命令列聊天室。

單程序迴圈

單線程序迴圈原理就是主程序沒和客戶端通訊,客戶端都要先連線伺服器,伺服器接受一個客戶端連線後從客戶端讀取資料,然後處理並將處理的結果返還給客戶端,然後再接受下一個客戶端的連線請求。

優點 單執行緒迴圈模型優點是簡單、易於實現,沒有同步、加鎖這些麻煩事,也沒有這些開銷。

缺點

  1. 阻塞模型,網路請求序列處理;
  2. 沒有利用多核cpu的優勢,網路請求序列處理;
  3. 無法支援同時多個客戶端連線;
  4. 程式序列操作,伺服器無法實現同時收發資料。

單執行緒IO複用

linux高併發伺服器中常用epoll作為IO複用機制。執行緒將需要處理的socket讀寫事件都註冊到epoll中,當有網路IO發生時,epoll_wait返回,執行緒檢查並處理到來socket上的請求。

優點

  1. 實現簡單, 減少鎖開銷,減少執行緒切換開銷。

缺點

  1. 只能使用單核cpu,handle時間過長會導致整個服務掛死;
  2. 當有客戶端數量超過一定數量後,效能會顯著下降;
  3. 只適用高IO、低計算,handle處理時間短的場景。
在這裡插入圖片描述

多執行緒/多程序

多執行緒、多程序模型主要特點是每個網路請求由一個程序/執行緒處理,執行緒內部使用阻塞式系統呼叫,線上程的職能劃分上,可以由一個單獨的執行緒處理accept連線,其餘執行緒處理具體的網路請求(收包,處理,發包);還可以多個程序單獨listen、accept網路連線。

優點:

1、實現相對簡單; 2、利用到CPU多核資源。

缺點:

1、執行緒內部還是阻塞的,舉個極端的例子,如果一個執行緒在handle的業務邏輯中sleep了,這個執行緒也就掛住了。

多執行緒/多程序IO複用

多執行緒、多程序IO服用模型,每個子程序都監聽服務,並且都使用epoll機制來處理程序的網路請求,子程序 accept() 後將建立已連線描述符,然後通過已連線描述符來與客戶端通訊。該機制適用於高併發的場景。

優點:

  1. 支撐較高併發。

缺點:

  1. 非同步程式設計不直觀、容易出錯
在這裡插入圖片描述

多執行緒劃分IO角色

多執行緒劃分IO角色主要功能有:一個accept thread處理新連線建立;一個IO thread pool處理網路IO;一個handle thread pool處理業務邏輯。使用場景如:電銷應用,thrift TThreadedSelectorServer。

優點:

  1. 按不同功能劃分執行緒,各執行緒處理固定功能,效率更高
  2. 可以根據業務特點配置執行緒數量來效能調優

缺點:

  1. 執行緒間通訊需要引入鎖開銷
  2. 邏輯較複雜,實現難度大
在這裡插入圖片描述

小結

上面介紹了常見的網路伺服器模型,還有AIO、協程,甚至還有其他的變型,在這裡不再討論。重要的是理解每種場景中所面臨的問題和每種模型的特點,設計出符合應用場景的方案才是好方案。

多執行緒併發伺服器模型

下面我們主要討論多執行緒併發伺服器模型。

程式碼結構

併發伺服器程式碼結構如下:

thread_func()
{
while(1){
recv(...);
process(...);
send(...);
}
close(...);
}
main(
socket(...);
bind(...);
listen(...);
while(1){
accept(...);
pthread_create();
}
}

由上可以看出,伺服器分為兩部分:主執行緒、子執行緒。

主執行緒

main函式即主執行緒,它的主要任務如下:

  1. socket()建立監聽套字;
  2. bind()繫結埠號和地址;
  3. listen()開啟監聽;
  4. accept()等待客戶端的連線,
  5. 當有客戶端連線時,accept()會建立一個新的套接字new_fd;
  6. 主執行緒會建立子執行緒,並將new_fd傳遞給子執行緒。

子執行緒

  1. 子執行緒函式為thread_func(),他通過new_fd處理和客戶端所有的通訊任務。

客戶端連線伺服器詳細步驟

下面我們分步驟來看客戶端連線伺服器的分步說明。

1. 客戶端連線伺服器

  1. 伺服器建立起監聽套接字listen_fd,並初始化;
  2. 客戶端建立套接字fd1;
  3. 客戶端client1通過套接字fd1連線伺服器的listen_fd;
在這裡插入圖片描述

2. 主執行緒建立子執行緒thread1

  1. server收到client1的連線請求後,accpet函式會返回一個新的套接字newfd1;
  2. 後面server與client1的通訊就依賴newfd1,監聽套接字listen_fd會繼續監聽其他客戶端的連線;
  3. 主執行緒通過pthead_create()建立一個子執行緒thread1,並把newfd1傳遞給thread1;
  4. server與client1的通訊就分別依賴newfd1、fd1。
  5. client1為了能夠實時收到server傳送的資訊,同時還要能夠從鍵盤上讀取資料,這兩個操作都是阻塞的,沒有資料的時候程序會休眠,所以必須建立子執行緒read_thread;
  6. client1的主線負責從鍵盤上讀取資料併發送給,子執行緒read_thread負責從server接受資訊。
在這裡插入圖片描述

3. client2連線伺服器

  1. 客戶端client2建立套接字fd2;
  2. 通過connect函式連線server的listen_fd;

4. 主執行緒建立子執行緒thread2

  1. server收到client2的連線請求後,accpet函式會返回一個新的套接字newfd2;
  2. 後面server與client2的通訊就依賴newfd2,監聽套接字listen_fd會繼續監聽其他客戶端的連線;
  3. 主執行緒通過pthead_create()建立一個子執行緒thread2,並把newfd2傳遞給thread2;
  4. server與client1的通訊就分別依賴newfd2、fd2。
  5. 同樣client2為了能夠實時收到server傳送的資訊,同時還要能夠從鍵盤上讀取資料必須建立子執行緒read_thread;
  6. client1的主線負責從鍵盤上讀取資料併發送給,子執行緒read_thread負責從server接受資訊。
在這裡插入圖片描述

由上圖可見,每一個客戶端連線server後,server都要建立一個專門的thread負責和該客戶端的通訊;每一個客戶端和server都有一對固定的fd組合用於連線。

例項

好了,理論講完了,根據一口君的慣例,也繼承祖師爺的教誨:talk is cheap,show you my code.不上程式碼,只寫理論的文章都是在耍流氓。

本例的主要功能描述如下:

  1. 實現多個客戶端可以同時連線伺服器;
  2. 客戶端可以實現獨立的收發資料;
  3. 客戶端傳送資料給伺服器後,伺服器會將資料原封不動返回給客戶端。

伺服器端

/*********************************************
伺服器程式TCPServer.c
公眾號:一口Linux
*********************************************/

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<errno.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>

#defineRECVBUFSIZE2048
void*rec_func(void*arg)
{
intsockfd,new_fd,nbytes;
charbuffer[RECVBUFSIZE];
inti;
new_fd=*((int*)arg);
free(arg);

while(1)
{
if((nbytes=recv(new_fd,buffer,RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"ReadError:%s\n",strerror(errno));
exit(1);
}
if(nbytes==-1)
{//客戶端出錯了返回值-1
close(new_fd);
break;
}
if(nbytes==0)
{//客戶端主動斷開連線,返回值是0
close(new_fd);
break;
}
buffer[nbytes]='\0';
printf("Ihavereceived:%s\n",buffer);


if(send(new_fd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"WriteError:%s\n",strerror(errno));
exit(1);
}

}

}

intmain(intargc,char*argv[])
{
charbuffer[RECVBUFSIZE];
intsockfd,new_fd,nbytes;
structsockaddr_inserver_addr;
structsockaddr_inclient_addr;
intsin_size,portnumber;
charhello[]="Hello!Socketcommunicationworld!\n";
pthread_ttid;
int*pconnsocke=NULL;
intret,i;

if(argc!=2)
{
fprintf(stderr,"Usage:%sportnumber\a\n",argv[0]);
exit(1);
}
/*埠號不對,退出*/
if((portnumber=atoi(argv[1]))<0)
{
fprintf(stderr,"Usage:%sportnumber\a\n",argv[0]);
exit(1);
}

/*伺服器端開始建立socket描述符sockfd用於監聽*/
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socketerror:%s\n\a",strerror(errno));
exit(1);
}

/*伺服器端填充sockaddr結構*/
bzero(&server_addr,sizeof(structsockaddr_in));
server_addr.sin_family=AF_INET;
/*自動填充主機IP*/
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);//自動獲取網絡卡地址
server_addr.sin_port=htons(portnumber);

/*捆綁sockfd描述符*/
if(bind(sockfd,(structsockaddr*)(&server_addr),sizeof(structsockaddr))==-1)
{
fprintf(stderr,"Binderror:%s\n\a",strerror(errno));
exit(1);
}

/*監聽sockfd描述符*/
if(listen(sockfd,10)==-1)
{
fprintf(stderr,"Listenerror:%s\n\a",strerror(errno));
exit(1);
}

while(1)
{
/*伺服器阻塞,直到客戶程式建立連線*/
sin_size=sizeof(structsockaddr_in);
if((new_fd=accept(sockfd,(structsockaddr*)&client_addr,&sin_size))==-1)
{
fprintf(stderr,"Accepterror:%s\n\a",strerror(errno));
exit(1);
}

pconnsocke=(int*)malloc(sizeof(int));
*pconnsocke=new_fd;

ret=pthread_create(&tid,NULL,rec_func,(void*)pconnsocke);
if(ret<0)
{
perror("pthread_createerr");
return-1;
}
}
//close(sockfd);
exit(0);
}

客戶端

/*********************************************
伺服器程式TCPServer.c
公眾號:一口Linux
*********************************************/

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<errno.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>
#defineRECVBUFSIZE1024

void*func(void*arg)
{
intsockfd,new_fd,nbytes;
charbuffer[RECVBUFSIZE];

new_fd=*((int*)arg);
free(arg);

while(1)
{
if((nbytes=recv(new_fd,buffer,RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"ReadError:%s\n",strerror(errno));
exit(1);
}
buffer[nbytes]='\0';
printf("Ihavereceived:%s\n",buffer);
}

}

intmain(intargc,char*argv[])
{
intsockfd;
charbuffer[RECVBUFSIZE];
structsockaddr_inserver_addr;
structhostent*host;
intportnumber,nbytes;
pthread_ttid;
int*pconnsocke=NULL;
intret;

//檢測引數個數
if(argc!=3)
{
fprintf(stderr,"Usage:%shostnameportnumber\a\n",argv[0]);
exit(1);
}
//argv2存放的是埠號,讀取該埠,轉換成整型變數
if((portnumber=atoi(argv[2]))<0)
{
fprintf(stderr,"Usage:%shostnameportnumber\a\n",argv[0]);
exit(1);
}
//建立一個套接子
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"SocketError:%s\a\n",strerror(errno));
exit(1);
}

//填充結構體,ip和port必須是伺服器的
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(portnumber);
server_addr.sin_addr.s_addr=inet_addr(argv[1]);//argv【1】是serverip地址

/*¿Í»§³ÌÐò·¢ÆðÁ¬œÓÇëÇó*/
if(connect(sockfd,(structsockaddr*)(&server_addr),sizeof(structsockaddr))==-1)
{
fprintf(stderr,"ConnectError:%s\a\n",strerror(errno));
exit(1);
}

//建立執行緒
pconnsocke=(int*)malloc(sizeof(int));
*pconnsocke=sockfd;

ret=pthread_create(&tid,NULL,func,(void*)pconnsocke);
if(ret<0)
{
perror("pthread_createerr");
return-1;
}
while(1)
{
#if1
printf("inputmsg:");
scanf("%s",buffer);
if(send(sockfd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"WriteError:%s\n",strerror(errno));
exit(1);
}
#endif
}
close(sockfd);
exit(0);
}

編譯 編譯執行緒,需要用到pthread庫,編譯命令如下:

  1. gcc s.c -o s -lpthread
  2. gcc cli.c -o c -lpthread 先本機測試
  3. 開啟一個終端 ./s 8888
  4. 再開一個終端 ./cl 127.0.0.1 8888,輸入一個字串"qqqqqqq"
  5. 再開一個終端 ./cl 127.0.0.1 8888,輸入一個字串"yikoulinux"

有讀者可能會注意到,server建立子執行緒的時候用的是以下程式碼:

pconnsocke=(int*)malloc(sizeof(int));
*pconnsocke=new_fd;

ret=pthread_create(&tid,NULL,rec_func,(void*)pconnsocke);
if(ret<0)
{
perror("pthread_createerr");
return-1;
}

為什麼必須要malloc一塊記憶體專門存放這個新的套接字呢? 這個是一個很隱蔽,很多新手都容易犯的錯誤。下一章,我會專門給大家講解。

本系列文章預計會更新4-5篇。最終目的是寫出一個帶登入註冊公聊私聊等功能的聊天室。喜歡的話請收藏關注。

圖片參考網路文章:https://cloud.tencent.com/developer/article/1376352

獲取更多關於Linux的資料,請關注公眾號「一口Linux」,回覆"進群",帶你加入大咖雲集的技術討論群。