TCP/IP網路程式設計 基於Linux程式設計_4 --多執行緒伺服器端的實現
執行緒基本概念
前面我們講過多程序伺服器,但我們知道它開銷很大,因此我們才引入執行緒,我們可以把它看成是一種輕量級程序。它相比程序有如下幾個優點:
- 執行緒的建立和上下文切換開銷更小且速度更快。
- 執行緒間交換資料時無需特殊技術。
程序:在作業系統構成單獨執行流的單位。
執行緒:在程序構成單獨執行流的單位。
它們的包含關係是,作業系統 > 程序 > 執行緒。程序與執行緒具體差異其實是這樣的,每個程序都有獨立的完整記憶體空間,它包括全域性資料區,堆區,棧區,而多程序伺服器之所以開銷大是因為只是為了區分棧區裡的不同函式流執行而把資料區,堆區,棧區記憶體全部複製了一份。而多執行緒就高效多了,它只把棧區分離出來,程序中的資料區,堆區則共享。具體記憶體結構示例圖如下:
執行緒建立及執行
執行緒具有單獨的執行流,因此需要單獨定義執行緒的入口函式,而且還需要請求作業系統在單獨的執行流中執行該函式,完成這個功能的函式如下:
#include <pthread.h>
int pthread_create(
pthread_t * restrict thread,//儲存執行緒ID
const pthread_attr_t * restrict attr,//執行緒屬性,NULL預設屬性
void * (* start_routine)(void *), //執行緒入口函式,函式指標
void * restrict arg //傳遞給入口函式的引數
);
例項程式碼:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
void * thread_main(void *arg);
int main(int argc, char *argv[])
{
pthread_t t_id;
int thread_param = 5;
if (pthread_create(&t_id, NULL, thread_main, (void *)&thread_param) != 0)
{
puts ("pthread_create() error");
return -1;
}
sleep(10);
puts("end of main");
return 0;
}
void * thread_main(void *arg)
{
int i;
int cnt =* ((int *)arg);
for (i = 0; i < cnt; i++)
{
sleep(1);
puts("running thread");
}
return NULL;
}
上面例項是用sleep延遲來控制執行緒的執行的,如果主執行緒不做延遲那麼執行到return 0;時,程序就結束了,相應的執行緒也會銷燬。而明顯用sleep這種方式控制執行緒執行流是不合理的,下面我們來看看一個更好的延遲函式,呼叫該函式的程序(或執行緒)將進入等待狀態,直到第一個引數為ID的執行緒終止為止。而且可以得到執行緒的入口函式返回值。
#include <pthread.h>
int pthread_join(pthread_t thread, void **status);
引數1:執行緒ID
引數2:儲存執行緒入口函式的返回值
例項程式碼:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void * thread_main(void *arg);
int main(int argc, char *argv[])
{
pthread_t t_id;
int thread_param = 5;
void * thr_ret;
//建立執行緒
if (pthread_create(&t_id, NULL, thread_main, (void *)&thread_param) != 0)
{
puts("pthread_create() error");
return -1;
}
//等待執行緒返回
if (pthread_join(t_id, &thr_ret) != 0)
{
puts("pthread_join() error");
return -1;
}
printf("Thread return message: %s \n", (char *)thr_ret);
free(thr_ret);
return 0;
}
//執行緒入口函式
void * thread_main(void *arg)
{
int i;
int cnt =* ((int *)arg);
char * msg = (char *)malloc(sizeof(char) * 50);
strcpy(msg, "Hello, I am thread ~ \n");
for (i = 0; i < cnt; i++)
{
puts("running thread");
}
return (void *)msg;
}
執行緒存在的問題和臨界區
前面我們知道了怎麼建立執行緒,但我們都是隻建立了一個執行緒,下面我們再來看看這樣一個例項,建立100個執行緒,它們都訪問了同一變數,其中一半對這個變數進行加1操作,一半進行減1操作,按道理其結果會等於0.
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0; //long long型別是64位整數型,多執行緒共同訪問
int main(int argc, char *argv[])
{
pthread_t thread_id[NUM_THREAD];
int i;
//建立100個執行緒,一半執行thread_inc,一半執行thread_des
for(i = 0; i < NUM_THREAD; i++)
{
if(i %2)
pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else
pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
//等待執行緒返回
for (i = 0; i < NUM_THREAD; i++)
pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num); //+1,-1按道理結果是0
return 0;
}
//執行緒入口函式1
void * thread_inc(void * arg)
{
for (int i = 0; i < 50000000; i++)
num += 1;//臨界區(引起問題的語句就是臨界區位置)
return NULL;
}
//執行緒入口函式2
void * thread_des(void * arg)
{
for (int i = 0; i < 50000000; i++)
num -= 1;//臨界區
return NULL;
}
從執行結果看並不是0,而且每次執行的結果都不同。那這是什麼原因引起的呢? 是因為每個執行緒訪問一個變數是這樣一個過程:先從記憶體取出這個變數值到CPU,然後CPU計算得到改變後的值,最後再將這個改變後的值寫回記憶體。因此,我們可以很容易看出,多個執行緒訪問同一變數,如果某個執行緒還只剛從記憶體取出資料,還沒來得及寫回記憶體,這時其它執行緒又訪問了這個變數,所以這個值就會不正確了。
接下來我們再來講講怎麼解決這個問題:執行緒同步
執行緒同步
執行緒同步用於解決執行緒訪問順序引發的問題,一般是如下兩種情況:
- 同時訪問同一記憶體空間時發生的情況
- 需要指定訪問同一記憶體空間的執行緒執行順序的情況
針對這兩種可能引發的情況,我們分別使用的同步技術是:互斥量和訊號量。
- 互斥量
互斥量技術從字面也可以理解,就是臨界區有執行緒訪問,其它執行緒就得排隊等待,它們的訪問是互斥的,實現方式就是給臨界區加鎖與釋放鎖。
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr); //建立互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);//銷燬互斥量
int pthread_mutex_lock(pthread_mutex_t *mutex);//加鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);//釋放鎖
簡言之,就是利用lock和unlock函式圍住臨界區的兩端。當某個執行緒呼叫pthread_mutex_lock進入臨界區後,如果沒有呼叫pthread_mutex_unlock釋放鎖退出,那麼其它執行緒就會一直阻塞在臨界區之外,我們把這種情況稱之為死鎖。所以臨界區圍住一定要lock和unlock一一對應。
例項程式碼:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0;
pthread_mutex_t mutex;
int main(int argc, char *argv[])
{
pthread_t thread_id[NUM_THREAD];
int i;
//互斥量的建立
pthread_mutex_init(&mutex, NULL);
for(i = 0; i < NUM_THREAD; i++)
{
if(i %2)
pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else
pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
for (i = 0; i < NUM_THREAD; i++)
pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num);
pthread_mutex_destroy(&mutex); //互斥量的銷燬
return 0;
}
/*擴充套件臨界區,減少加鎖,釋放鎖呼叫次數,但這樣變數必須加滿到50000000次後其它執行緒才能訪問.
這樣是延長了執行緒的等待時間,但縮短了加鎖,釋放鎖函式呼叫的時間,這裡沒有定論,自己酌情考慮*/
void * thread_inc(void * arg)
{
pthread_mutex_lock(&mutex); //互斥量鎖住
for (int i = 0; i < 1000000; i++)
num += 1;
pthread_mutex_unlock(&mutex); //互斥量釋放鎖
return NULL;
}
/*縮短了執行緒等待時間,但迴圈建立,釋放鎖函式呼叫時間增加*/
void * thread_des(void * arg)
{
for (int i = 0; i < 1000000; i++)
{
pthread_mutex_lock(&mutex);
num -= 1;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
- 訊號量
訊號量與互斥量類似,只是互斥量是用鎖來控制執行緒訪問而訊號量是用二進位制0,1來完成控制執行緒順序。sem_post訊號量加1,sem_wait訊號量減1,當訊號量為0時,sem_wait就會阻斷,因此通過這樣讓訊號量加1減1就能控制執行緒的執行順序了。
註釋:mac上測試訊號量函式返回-1失敗,以後還是Linux上整吧,也許這些介面已經過時了…
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);//建立訊號量
int sem_destroy(sem_t *sem);//銷燬訊號量
int sem_post(sem_t *sem);//訊號量加1
int sem_wait(sem_t *sem);//訊號量減1,為0時阻塞
例項程式碼:執行緒A從使用者輸入得到值後存入全域性變數num,此時執行緒B將取走該值並累加。該過程共進行5次,完成後輸出總和並退出程式。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;
int main(int argc, char *argv[])
{
pthread_t id_t1, id_t2;
sem_init(&sem_one, 0, 0);
sem_init(&sem_two, 0, 1);
pthread_create(&id_t1, NULL, read, NULL);
pthread_create(&id_t2, NULL, accu, NULL);
pthread_join(id_t1, NULL);
pthread_join(id_t2, NULL);
sem_destroy(&sem_one);
sem_destroy(&sem_two);
return 0;
}
void * read(void * arg)
{
int i;
for (i = 0; i < 5; i++) {
fputs("Input num: ", stdout);
sem_wait(&sem_two);
scanf("%d", &num);
sem_post(&sem_one);
}
return NULL;
}
void * accu(void * arg)
{
int sum = 0 , i;
for (i = 0; i < 5; i++) {
sem_wait(&sem_one);
sum+= num;
sem_post(&sem_two);
}
printf("Result: %d \n", sum);
return NULL;
}
補充:執行緒的銷燬,執行緒建立後並不是其入口函式返回後就會自動銷燬,需要手動銷燬,不然執行緒建立的記憶體空間將一直存在。一般手動銷燬有如下兩種方式:1,呼叫pthread_join函式,其返回後同時銷燬執行緒 ,是一個阻斷函式,服務端一般不用它銷燬,因為服務端主執行緒不宜阻斷,還要實時監聽客服端連線。2,呼叫pthread_detach函式,不會阻塞,執行緒返回自動銷燬執行緒,不過要注意呼叫它後不能再呼叫pthread_join函式,它與pthread_join主要區別就是一個是阻塞函式,一個不阻塞。
多執行緒併發服務端的實現
使用多執行緒實現了一個簡單的聊天程式,並對臨界區(clnt_cnt,clnt_socks)進行加鎖訪問.
- 服務端:
//
// main.cpp
// hello_server
//
// Created by app05 on 15-10-22.
// Copyright (c) 2015年 app05. All rights reserved.
//臨界區是:clnt_cnt和clnt_socks訪問處
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#define BUF_SIZE 100
#define MAX_CLNT 256
void * handle_clnt(void * arg);
void send_msg(char *msg, int len);
void error_handling(char * msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
socklen_t clnt_adr_sz;
pthread_t t_id;
if (argc != 2) {
printf("Usage : %s <port> \n", argv[0]);
exit(1);
}
//建立互斥量
pthread_mutex_init(&mutx, NULL);
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr, 0, sizeof(serv_adr));
serv_adr.sin_family = AF_INET;
serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_adr.sin_port = htons(atoi(argv[1]));
if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1)
error_handling("bind() error");
if(listen(serv_sock, 5) == -1)
error_handling("listen() error");
while (1)
{
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻斷,監聽客服端連線請求
//臨界區
pthread_mutex_lock(&mutx); //加鎖
clnt_socks[clnt_cnt++] = clnt_sock; //新連線的客服端儲存到clnt_socks數組裡
pthread_mutex_unlock(&mutx); //釋放鎖
//建立執行緒
pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock);
pthread_detach(t_id); //銷燬執行緒,執行緒return後自動呼叫銷燬,不阻斷
printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
}
close(serv_sock);
return 0;
}
//執行緒執行
void * handle_clnt(void * arg)
{
int clnt_sock = *((int *)arg);
int str_len = 0, i;
char msg[BUF_SIZE];
while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
send_msg(msg, str_len);
//從陣列中移除當前客服端
pthread_mutex_lock(&mutx);
for (i = 0; i < clnt_cnt; i++)
{
if (clnt_sock == clnt_socks[i])
{
while (i++ < clnt_cnt - 1)
clnt_socks[i] = clnt_socks[i + 1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mutx);
close(clnt_sock);
return NULL;
}
//向所有連線的客服端傳送訊息
void send_msg(char * msg, int len)
{
int i;
pthread_mutex_lock(&mutx);
for (i = 0; i < clnt_cnt; i++)
write(clnt_socks[i], msg, len);
pthread_mutex_unlock(&mutx);
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
- 客服端
//
// main.cpp
// hello_client
//
// Created by app05 on 15-10-22.
// Copyright (c) 2015年 app05. All rights reserved.
//
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>
#define BUF_SIZE 100
#define NAME_SIZE 20
void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char *message);
char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];
int main(int argc, const char * argv[]) {
int sock;
struct sockaddr_in serv_addr;
pthread_t snd_thread, rcv_thread;
void * thread_return;
if(argc != 4)
{
printf("Usage: %s <IP> <port> \n", argv[0]);
exit(1);
}
sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到編譯器引數裡
sock = socket(PF_INET, SOCK_STREAM, 0);
if(sock == -1)
error_handling("socket() error");
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
//多執行緒分離輸入和輸出
pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
//阻塞,等待返回
pthread_join(snd_thread, &thread_return);
pthread_join(rcv_thread, &thread_return);
close(sock);
return 0;
}
//傳送訊息
void * send_msg(void * arg)
{
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
while (1) {
fgets(msg, BUF_SIZE, stdin);
if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) {
close(sock);
exit(0);
}
sprintf(name_msg, "%s %s", name, msg);
write(sock, name_msg, strlen(name_msg));
}
return NULL;
}
//接收訊息
void * recv_msg(void * arg)
{
int sock = *((int *)arg);
char name_msg[NAME_SIZE + BUF_SIZE];
int str_len;
while (1) {
str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
if(str_len == -1)
return (void *)-1;
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
return NULL;
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}