1. 程式人生 > >c++ 網路程式設計(九)TCP/IP LINUX/windows下 多執行緒超詳細教程 以及 多執行緒實現服務端

c++ 網路程式設計(九)TCP/IP LINUX/windows下 多執行緒超詳細教程 以及 多執行緒實現服務端

原文作者:aircraft

原文連結:https://www.cnblogs.com/DOMLX/p/9661012.html

 先講Linux下(windows下在後面可以直接跳到後面看):

一.執行緒基本概念

前面我們講過多程序伺服器,但我們知道它開銷很大,因此我們才引入執行緒,我們可以把它看成是一種輕量級程序。它相比程序有如下幾個優點:

  • 執行緒的建立和上下文切換開銷更小且速度更快。
  • 執行緒間交換資料時無需特殊技術。

程序:在作業系統構成單獨執行流的單位。  執行緒:在程序構成單獨執行流的單位。  它們的包含關係是,作業系統 > 程序 > 執行緒。程序與執行緒具體差異其實是這樣的,每個程序都有獨立的完整記憶體空間,它包括全域性資料區,堆區,棧區,而多程序伺服器之所以開銷大是因為只是為了區分棧區裡的不同函式流執行而把資料區,堆區,棧區記憶體全部複製了一份。而多執行緒就高效多了,它只把棧區分離出來,程序中的資料區,堆區則共享。具體記憶體結構示例圖如下: 這裡寫圖片描述

這裡寫圖片描述

二.建立執行緒

下面的程式,我們可以用它來建立一個執行緒:

#include <pthread.h>
pthread_create (thread, attr, start_routine, arg)

在這裡,pthread_create 建立一個新的執行緒,並讓它可執行。下面是關於引數的說明:

引數 描述
thread 指向執行緒識別符號指標。
attr 一個不透明的屬性物件,可以被用來設定執行緒屬性。您可以指定執行緒屬性物件,也可以使用預設值 NULL。
start_routine 執行緒執行函式起始地址,一旦執行緒被建立就會執行。
arg 執行函式的引數。它必須通過把引用作為指標強制轉換為 void 型別進行傳遞。如果沒有傳遞引數,則使用 NULL。

建立執行緒成功時,函式返回 0,若返回值不為 0 則說明建立執行緒失敗。

終止執行緒

使用下面的程式,我們可以用它來終止一個執行緒:

#include <pthread.h>
pthread_exit (status)

在這裡,pthread_exit 用於顯式地退出一個執行緒。通常情況下,pthread_exit() 函式是線上程完成工作後無需繼續存在時被呼叫。

如果 main() 是在它所建立的執行緒之前結束,並通過 pthread_exit() 退出,那麼其他執行緒將繼續執行。否則,它們將在 main() 結束時自動被終止。

例項

以下簡單的例項程式碼使用 pthread_create() 函式建立了 5 個執行緒,每個執行緒輸出"Hello Runoob!":

複製程式碼

#include <iostream>
// 必須的標頭檔案
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS 5
 
// 執行緒的執行函式
void* say_hello(void* args)
{
    cout << "Hello Runoob!" << endl;
    return 0;
}
 
int main()
{
    // 定義執行緒的 id 變數,多個變數使用陣列
    pthread_t tids[NUM_THREADS];
    for(int i = 0; i < NUM_THREADS; ++i)
    {
        //引數依次是:建立的執行緒id,執行緒引數,呼叫的函式,傳入的函式引數
        int ret = pthread_create(&tids[i], NULL, say_hello, NULL);
        if (ret != 0)
        {
           cout << "pthread_create error: error_code=" << ret << endl;
        }
    }
    //等各個執行緒退出後,程序才結束,否則程序強制結束了,執行緒可能還沒反應過來;
    pthread_exit(NULL);
}

複製程式碼

linux下編譯執行後結果為:

Hello Runoob!

Hello Runoob!

Hello Runoob!

Hello Runoob!

Hello Runoob!

以下簡單的例項程式碼使用 pthread_create() 函式建立了 5 個執行緒,並接收傳入的引數。每個執行緒列印一個 "Hello Runoob!" 訊息,並輸出接收的引數,然後呼叫 pthread_exit() 終止執行緒。

複製程式碼

//檔名:test.cpp
 
#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *PrintHello(void *threadid)
{  
   // 對傳入的引數進行強制型別轉換,由無型別指標變為整形數指標,然後再讀取
   int tid = *((int*)threadid);
   cout << "Hello Runoob! 執行緒 ID, " << tid << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   int indexes[NUM_THREADS];// 用陣列來儲存i的值
   int rc;
   int i;
   for( i=0; i < NUM_THREADS; i++ ){      
      cout << "main() : 建立執行緒, " << i << endl;
      indexes[i] = i; //先儲存i的值
      // 傳入的時候必須強制轉換為void* 型別,即無型別指標        
      rc = pthread_create(&threads[i], NULL, 
                          PrintHello, (void *)&(indexes[i]));
      if (rc){
         cout << "Error:無法建立執行緒," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

複製程式碼

linux下編譯執行後結果為:

main() : 建立執行緒, 0
main() : 建立執行緒, 1
Hello Runoob! 執行緒 ID, 0
main() : 建立執行緒, Hello Runoob! 執行緒 ID, 21

main() : 建立執行緒, 3
Hello Runoob! 執行緒 ID, 2
main() : 建立執行緒, 4
Hello Runoob! 執行緒 ID, 3

向執行緒傳遞引數

這個例項演示瞭如何通過結構傳遞多個引數。您可以線上程回撥中傳遞任意的資料型別,因為它指向 void,如下面的例項所示:

複製程式碼

#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
struct thread_data{
   int  thread_id;
   char *message;
};
 
void *PrintHello(void *threadarg)
{
   struct thread_data *my_data;
 
   my_data = (struct thread_data *) threadarg;
 
   cout << "Thread ID : " << my_data->thread_id ;
   cout << " Message : " << my_data->message << endl;
 
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   struct thread_data td[NUM_THREADS];
   int rc;
   int i;
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout <<"main() : creating thread, " << i << endl;
      td[i].thread_id = i;
      td[i].message = (char*)"This is message";
      rc = pthread_create(&threads[i], NULL,
                          PrintHello, (void *)&td[i]);
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

複製程式碼

linux下編譯執行後結果為:

main() : creating thread, 0
main() : creating thread, 1
Thread ID : 0 Message : This is message
main() : creating thread, Thread ID : 21
 Message : This is message
main() : creating thread, 3
Thread ID : 2 Message : This is message
main() : creating thread, 4
Thread ID : 3 Message : This is message
Thread ID : 4 Message : This is message

連線和分離執行緒

我們可以使用以下兩個函式來連線或分離執行緒:

pthread_join (threadid, status) 
pthread_detach (threadid)

pthread_join() 子程式阻礙呼叫程式,直到指定的 threadid 執行緒終止為止。當建立一個執行緒時,它的某個屬性會定義它是否是可連線的(joinable)或可分離的(detached)。只有建立時定義為可連線的執行緒才可以被連線。如果執行緒建立時被定義為可分離的,則它永遠也不能被連線。

用途:有的人沒有在main 函式最後呼叫 pthread_exit(NULL); 函式等待,而是選擇sleep,這裡就可以用pthread_join()代替sleep的不可控制,,而有時候執行緒結束的時候你想做某一些事情需要知道執行緒是否結束了,也可以呼叫這個函式。

這個例項演示瞭如何使用 pthread_join() 函式來等待執行緒的完成。

複製程式碼

#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *wait(void *t)
{
   int i;
   long tid;
 
   tid = (long)t;
 
   sleep(1);
   cout << "Sleeping in thread " << endl;
   cout << "Thread with id : " << tid << "  ...exiting " << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   int rc;
   int i;
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;
   void *status;
 
   // 初始化並設定執行緒為可連線的(joinable)
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, wait, (void *)&i );
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
 
   // 刪除屬性,並等待其他執行緒
   pthread_attr_destroy(&attr);
   for( i=0; i < NUM_THREADS; i++ ){
      rc = pthread_join(threads[i], &status);
      if (rc){
         cout << "Error:unable to join," << rc << endl;
         exit(-1);
      }
      cout << "Main: completed thread id :" << i ;
      cout << "  exiting with status :" << status << endl;
   }
 
   cout << "Main: program exiting." << endl;
   pthread_exit(NULL);
}

複製程式碼

linux下編譯執行結果:

複製程式碼

main() : creating thread, 0
main() : creating thread, 1
main() : creating thread, 2
main() : creating thread, 3
main() : creating thread, 4
Sleeping in thread 
Thread with id : 4  ...exiting 
Sleeping in thread 
Thread with id : 3  ...exiting 
Sleeping in thread 
Thread with id : 2  ...exiting 
Sleeping in thread 
Thread with id : 1  ...exiting 
Sleeping in thread 
Thread with id : 0  ...exiting 
Main: completed thread id :0  exiting with status :0
Main: completed thread id :1  exiting with status :0
Main: completed thread id :2  exiting with status :0
Main: completed thread id :3  exiting with status :0
Main: completed thread id :4  exiting with status :0
Main: program exiting.

複製程式碼

二.執行緒執行中存在的問題

執行緒存在的問題和臨界區

前面我們知道了怎麼建立執行緒,下面我們再來看看這樣一個例項,建立100個執行緒,它們都訪問了同一變數,其中一半對這個變數進行加1操作,一半進行減1操作,按道理其結果會等於0.

複製程式碼

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0;   //long long型別是64位整數型,多執行緒共同訪問

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //建立100個執行緒,一半執行thread_inc,一半執行thread_des
    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    //等待執行緒返回
    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);  //+1,-1按道理結果是0
    return 0;
}

//執行緒入口函式1
void * thread_inc(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num += 1;//臨界區(引起問題的語句就是臨界區位置)
    return NULL;
}

//執行緒入口函式2
void * thread_des(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num -= 1;//臨界區
    return NULL;
}

複製程式碼

從執行結果看並不是0,而且每次執行的結果都不同。那這是什麼原因引起的呢? 是因為每個執行緒訪問一個變數是這樣一個過程:先從記憶體取出這個變數值到CPU,然後CPU計算得到改變後的值,最後再將這個改變後的值寫回記憶體。因此,我們可以很容易看出,多個執行緒訪問同一變數,如果某個執行緒還只剛從記憶體取出資料,還沒來得及寫回記憶體,這時其它執行緒又訪問了這個變數,所以這個值就會不正確了。

為什麼會出現這種情況呢,來舉個例子:

如上圖所示:兩個執行緒都要將某一個共同訪問的變數加1,

就像上面說的這個運算過程是:執行緒1先拿到值然後經過cpu的運算在賦值回去,然後執行緒2在取值運算放回,上圖實現的是最理想的情況,假如這時候執行緒一拿到了值99,同時執行緒二沒間隔的也拿了99,這時候就要出問題了。執行緒一運算後賦值100回去,然後執行緒二運算後又賦值100回去,,,注意了哈,這裡兩個執行緒都是為了Num++服務,他們這樣搞事情不就代表一個做了無用功嗎?(我胖虎要是還拿的動刀還不打死你!!!)

這些看完應該就理解了為什麼需要執行緒同步!!!!以及執行緒同步的重要性了吧!!

接下來我們再來講講怎麼解決這個問題:執行緒同步

執行緒同步

執行緒同步用於解決執行緒訪問順序引發的問題,一般是如下兩種情況:

  1. 同時訪問同一記憶體空間時發生的情況
  2. 需要指定訪問同一記憶體空間的執行緒執行順序的情況

針對這兩種可能引發的情況,我們分別使用的同步技術是:互斥量和訊號量。

    • 互斥量  互斥量技術從字面也可以理解,就是臨界區有執行緒訪問,其它執行緒就得排隊等待,它們的訪問是互斥的,實現方式就是給臨界區加鎖與釋放鎖。

複製程式碼

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);  //建立互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);//銷燬互斥量

int pthread_mutex_lock(pthread_mutex_t *mutex);//加鎖

int pthread_mutex_unlock(pthread_mutex_t *mutex);//釋放鎖

複製程式碼

簡言之,就是利用lock和unlock函式圍住臨界區的兩端。當某個執行緒呼叫pthread_mutex_lock進入臨界區後,如果沒有呼叫pthread_mutex_unlock釋放鎖退出,那麼其它執行緒就會一直阻塞在臨界區之外,我們把這種情況稱之為死鎖。所以臨界區圍住一定要lock和unlock一一對應。

接下來看一下程式碼示例:

複製程式碼

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);

long long num = 0;
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //互斥量的建立
    pthread_mutex_init(&mutex, NULL);

    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);
    pthread_mutex_destroy(&mutex);   //互斥量的銷燬
    return 0;
}

/*擴充套件臨界區,減少加鎖,釋放鎖呼叫次數,但這樣變數必須加滿到50000000次後其它執行緒才能訪問.
 這樣是延長了執行緒的等待時間,但縮短了加鎖,釋放鎖函式呼叫的時間,這裡沒有定論,自己酌情考慮*/
void * thread_inc(void * arg)
{
    pthread_mutex_lock(&mutex);  //互斥量鎖住
    for (int i = 0; i < 1000000; i++)
        num += 1;
    pthread_mutex_unlock(&mutex); //互斥量釋放鎖
    return NULL;
}

/*縮短了執行緒等待時間,但迴圈建立,釋放鎖函式呼叫時間增加*/
void * thread_des(void * arg)
{
    for (int i = 0; i < 1000000; i++)
    {
        pthread_mutex_lock(&mutex);
        num -= 1;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

複製程式碼

編譯執行可以得到結果為:0

訊號量  訊號量與互斥量類似,只是互斥量是用鎖來控制執行緒訪問而訊號量是用二進位制0,1來完成控制執行緒順序。sem_post訊號量加1,sem_wait訊號量減1,當訊號量為0時,sem_wait就會阻斷,因此通過這樣讓訊號量加1減1就能控制執行緒的執行順序了。  註釋:mac上測試訊號量函式返回-1失敗,以後還是Linux上整吧,也許這些介面已經過時了…

複製程式碼

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);//建立訊號量
int sem_destroy(sem_t *sem);//銷燬訊號量
int sem_post(sem_t *sem);//訊號量加1
int sem_wait(sem_t *sem);//訊號量減1,為0時阻塞

複製程式碼

例項程式碼:執行緒A從使用者輸入得到值後存入全域性變數num,此時執行緒B將取走該值並累加。該過程共進行5次,完成後輸出總和並退出程式。

複製程式碼

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;


int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    sem_init(&sem_one, 0, 0);
    sem_init(&sem_two, 0, 1);

    pthread_create(&id_t1, NULL, read, NULL);
    pthread_create(&id_t2, NULL, accu, NULL);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);

    sem_destroy(&sem_one);
    sem_destroy(&sem_two);

    return 0;
}

void * read(void * arg)
{
    int i;
    for (i = 0; i < 5; i++) {
        fputs("Input num: ", stdout);
        sem_wait(&sem_two);
        scanf("%d", &num);
        sem_post(&sem_one);
    }
    return NULL;
}

void * accu(void * arg)
{
    int sum = 0 , i;
    for (i = 0; i < 5; i++) {
        sem_wait(&sem_one);
        sum+= num;
        sem_post(&sem_two);
    }
    printf("Result: %d \n", sum);
    return NULL;
}

複製程式碼

補充:執行緒的銷燬,執行緒建立後並不是其入口函式返回後就會自動銷燬,需要手動銷燬,不然執行緒建立的記憶體空間將一直存在。一般手動銷燬有如下兩種方式:1,呼叫pthread_join函式,其返回後同時銷燬執行緒 ,是一個阻斷函式,服務端一般不用它銷燬,因為服務端主執行緒不宜阻斷,還要實時監聽客服端連線。2,呼叫pthread_detach函式,不會阻塞,執行緒返回自動銷燬執行緒,不過要注意呼叫它後不能再呼叫pthread_join函式,它與pthread_join主要區別就是一個是阻塞函式,一個不阻塞。

四.多執行緒併發服務端的實現

使用多執行緒實現了一個簡單的聊天程式,並對臨界區(clnt_cnt,clnt_socks)進行加鎖訪問.

  • 服務端:

複製程式碼

//
//  main.cpp
//  hello_server
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//臨界區是:clnt_cnt和clnt_socks訪問處

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void * handle_clnt(void * arg);
void send_msg(char *msg, int len);
void error_handling(char * msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t clnt_adr_sz;
    pthread_t t_id;
    if (argc != 2) {
        printf("Usage : %s <port> \n", argv[0]);
        exit(1);
    }

    //建立互斥量
    pthread_mutex_init(&mutx, NULL);
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_adr.sin_port = htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1)
        error_handling("bind() error");
    if(listen(serv_sock, 5) == -1)
        error_handling("listen() error");

    while (1)
    {
        clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻斷,監聽客服端連線請求

        //臨界區
        pthread_mutex_lock(&mutx); //加鎖
        clnt_socks[clnt_cnt++] = clnt_sock; //新連線的客服端儲存到clnt_socks數組裡
        pthread_mutex_unlock(&mutx); //釋放鎖

        //建立執行緒
        pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock);
        pthread_detach(t_id); //銷燬執行緒,執行緒return後自動呼叫銷燬,不阻斷

        printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
    }

    close(serv_sock);
    return 0;
}

//執行緒執行
void * handle_clnt(void * arg)
{
    int clnt_sock = *((int *)arg);
    int str_len = 0, i;
    char msg[BUF_SIZE];

    while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
        send_msg(msg, str_len);

    //從陣列中移除當前客服端
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
    {
        if (clnt_sock == clnt_socks[i])
        {
            while (i++ < clnt_cnt - 1)
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
}

//向所有連線的客服端傳送訊息
void send_msg(char * msg, int len)
{
    int i;
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

複製程式碼

客戶端:

複製程式碼

//
//  main.cpp
//  hello_client
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//
//

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char *message);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, const char * argv[]) {
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void * thread_return;

    if(argc != 4)
    {
        printf("Usage: %s <IP> <port> \n", argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到編譯器引數裡
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1)
        error_handling("socket() error");

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));

    if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");

    //多執行緒分離輸入和輸出
    pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
    //阻塞,等待返回
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);

    close(sock);
    return 0;
}

//傳送訊息
void * send_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    while (1) {
        fgets(msg, BUF_SIZE, stdin);
        if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s  %s", name, msg);
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;
}

//接收訊息
void * recv_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len;
    while (1) {
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        if(str_len == -1)
            return (void *)-1;
        name_msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

複製程式碼

windows下:

一.執行緒概述

理解Windows核心物件

執行緒是系統核心物件之一。在學習執行緒之前,應先了解一下核心物件。核心物件是系統核心分配的一個記憶體塊,該記憶體塊描述的是一個數據結構,其成員負責維護物件的各種資訊。核心物件的資料只能由系統核心來訪問,應用程式無法在記憶體中找到這些資料結構並直接改變他們的內容。

常用的系統核心物件有事件物件、檔案物件、作業物件、互斥物件、管道物件、程序物件和執行緒物件等。不同型別的核心物件,其資料結構各有不同。

理解程序和執行緒

程序被認為是一個正在執行的程式的例項,它也屬於系統核心物件。可以將程序簡單的理解為一個容器,它只是提供空間,執行程式的程式碼是由執行緒來實現的。執行緒存在於程序中,它負責執行程序地址空間中的程式碼。當一個程序建立時,系統會自動為其建立一個執行緒,該執行緒被稱為主執行緒。在主執行緒中使用者可以通過程式碼建立其他執行緒,當程序中的主執行緒結束時,程序也就結束了。

執行緒的建立

Windows下,建立執行緒有多種方式,以下將逐一介紹。注意它們的區別。

使用CreateThread函式建立執行緒

Windows API函式。該函式在主執行緒的基礎上建立一個新執行緒。微軟在Windows API中提供了建立新的執行緒的函式CreateThread。

HANDLECreateThread(
LPSECURITY_ATTRIBUTES lpThreadAttributes,//執行緒安全屬性
DWORD dwStackSize,//堆疊大小
LPTHREAD_START_ROUTINE lpStartAddress,//執行緒函式
LPVOID lpParameter,//執行緒引數
DWORD dwCreationFlags,//執行緒建立屬性
LPDWORD lpThreadId//執行緒ID
);
示例程式碼:

複製程式碼

#include "stdafx.h"
#include<iostream>
#include<Windows.h>
using namespace std;

DWORD WINAPI Fun1Proc(LPVOID lpParameter)
{
    cout << "thread function Fun1Proc!\n";

    return 0;
}

int main()
{
    HANDLE hThread1 = CreateThread(NULL, 0, Fun1Proc, NULL, 0, NULL);
    CloseHandle(hThread1);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}

複製程式碼

結果圖:

使用_beginthreadex函式建立執行緒

除了使用CreateThread API函式建立執行緒外,還可以用C++語言提供的_beginthreadex函式來建立執行緒。

uintptr_t _beginthreadex( // NATIVE CODE  
   void *security,  //執行緒安全屬性
   unsigned stack_size,  //執行緒的棧大小
   unsigned ( *start_address )( void * ),//執行緒函式  
   void *arglist,  //傳遞到執行緒函式中的引數
   unsigned initflag,  //執行緒初始化標記
   unsigned *thrdaddr   //執行緒ID
); 
 示例程式碼:

複製程式碼

#include "stdafx.h"
#include<iostream>
#include<Windows.h>
#include<process.h>
using namespace std;

unsigned int _stdcall ThreadProc(LPVOID lpParameter)
{
    cout << "thread function ThreadProc!\n";
    return 0;
}

int main()
{
    _beginthreadex(NULL, 0, ThreadProc, 0, 0, NULL);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}

複製程式碼

二.執行緒同步

為什麼要進行執行緒同步?

  在程式中使用多執行緒時,一般很少有多個執行緒能在其生命期內進行完全獨立的操作。更多的情況是一些執行緒進行某些處理操作,而其他的執行緒必須對其處理結果進行了解。正常情況下對這種處理結果的瞭解應當在其處理任務完成後進行。    如果不採取適當的措施,其他執行緒往往會線上程處理任務結束前就去訪問處理結果,這就很有可能得到有關處理結果的錯誤瞭解。例如,多個執行緒同時訪問同一個全域性變數,如果都是讀取操作,則不會出現問題。如果一個執行緒負責改變此變數的值,而其他執行緒負責同時讀取變數內容,則不能保證讀取到的資料是經過寫執行緒修改後的。    為了確保讀執行緒讀取到的是經過修改的變數,就必須在向變數寫入資料時禁止其他執行緒對其的任何訪問,直至賦值過程結束後再解除對其他執行緒的訪問限制。這種保證執行緒能瞭解其他執行緒任務處理結束後的處理結果而採取的保護措施即為執行緒同步。

程式碼示例:  兩個執行緒同時對一個全域性變數進行加操作,演示了多執行緒資源訪問衝突的情況。

複製程式碼

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1;
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

複製程式碼

可以看到有時兩個執行緒計算的值相同,這就跟上面Linux下建立一百個執行緒將數字加減為0沒成功一樣的道理,都是訪問記憶體的時候衝突了。

為什麼會出現這種情況呢,來舉個例子:

如上圖所示:兩個執行緒都要將某一個共同訪問的變數加1,

就像上面說的這個運算過程是:執行緒1先拿到值然後經過cpu的運算在賦值回去,然後執行緒2在取值運算放回,上圖實現的是最理想的情況,假如這時候執行緒一拿到了值99,同時執行緒二沒間隔的也拿了99,這時候就要出問題了。執行緒一運算後賦值100回去,然後執行緒二運算後又賦值100回去,,,注意了哈,這裡兩個執行緒都是為了Num++服務,他們這樣搞事情不就代表一個做了無用功嗎?(我胖虎要是還拿的動刀還不打死你!!!)

這些看完應該就理解了為什麼需要執行緒同步!!!!以及執行緒同步的重要性了吧!!

關於執行緒同步

執行緒之間通訊的兩個基本問題是互斥和同步。

  • 執行緒同步是指執行緒之間所具有的一種制約關係,一個執行緒的執行依賴另一個執行緒的訊息,當它沒有得到另一個執行緒的訊息時應等待,直到訊息到達時才被喚醒。
  • 執行緒互斥是指對於共享的作業系統資源(指的是廣義的”資源”,而不是Windows的.res檔案,譬如全域性變數就是一種共享資源),在各執行緒訪問時的排它性。當有若干個執行緒都要使用某一共享資源時,任何時刻最多隻允許一個執行緒去使用,其它要使用該資源的執行緒必須等待,直到佔用資源者釋放該資源。

執行緒互斥是一種特殊的執行緒同步。實際上,互斥和同步對應著執行緒間通訊發生的兩種情況:

  • 當有多個執行緒訪問共享資源而不使資源被破壞時;
  • 當一個執行緒需要將某個任務已經完成的情況通知另外一個或多個執行緒時。

從大的方面講,執行緒的同步可分使用者模式的執行緒同步和核心物件的執行緒同步兩大類。

  • 使用者模式中執行緒的同步方法主要有原子訪問和臨界區等方法。其特點是同步速度特別快,適合於對執行緒執行速度有嚴格要求的場合。
  • 核心物件的執行緒同步則主要由事件、等待定時器、訊號量以及訊號燈等核心物件構成。由於這種同步機制使用了核心物件,使用時必須將執行緒從使用者模式切換到核心模式,而這種轉換一般要耗費近千個CPU週期,因此同步速度較慢,但在適用性上卻要遠優於使用者模式的執行緒同步方式。

在WIN32中,同步機制主要有以下幾種:  (1)事件(Event);  (2)訊號量(semaphore);  (3)互斥量(mutex);  (4)臨界區(Critical section)。

Win32中的四種同步方式

臨界區

臨界區(Critical Section)是一段獨佔對某些共享資源訪問的程式碼,在任意時刻只允許一個執行緒對共享資源進行訪問。如果有多個執行緒試圖同時訪問臨界區,那麼在有一個執行緒進入後其他所有試圖訪問此臨界區的執行緒將被掛起,並一直持續到進入臨界區的執行緒離開。臨界區在被釋放後,其他執行緒可以繼續搶佔,並以此達到用原子方式操作共享資源的目的。

臨界區在使用時以CRITICAL_SECTION結構物件保護共享資源,並分別用EnterCriticalSection()和LeaveCriticalSection()函式去標識和釋放一個臨界區。所用到的CRITICAL_SECTION結構物件必須經過InitializeCriticalSection()的初始化後才能使用,而且必須確保所有執行緒中的任何試圖訪問此共享資源的程式碼都處在此臨界區的保護之下。否則臨界區將不會起到應有的作用,共享資源依然有被破壞的可能。

程式碼示例:

複製程式碼

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全域性變數
CRITICAL_SECTION Critical;      //定義臨界區控制代碼
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
int main()
{
    InitializeCriticalSection(&Critical);   //初始化臨界區物件
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

複製程式碼

問題解決!!!

事件

事件(Event)是WIN32提供的最靈活的執行緒間同步方式,事件可以處於激發狀態(signaled or true)或未激發狀態(unsignal or false)。根據狀態變遷方式的不同,事件可分為兩類:  (1)手動設定:這種物件只可能用程式手動設定,在需要該事件或者事件發生時,採用SetEvent及ResetEvent來進行設定。  (2)自動恢復:一旦事件發生並被處理後,自動恢復到沒有事件狀態,不需要再次設定。

使用”事件”機制應注意以下事項:  (1)如果跨程序訪問事件,必須對事件命名,在對事件命名的時候,要注意不要與系統名稱空間中的其它全域性命名物件衝突;  (2)事件是否要自動恢復;  (3)事件的初始狀態設定。

由於event物件屬於核心物件,故程序B可以呼叫OpenEvent函式通過物件的名字獲得程序A中event物件的控制代碼,然後將這個控制代碼用於ResetEvent、SetEvent和WaitForMultipleObjects等函式中。此法可以實現一個程序的執行緒控制另一程序中執行緒的執行,例如:

HANDLE hEvent=OpenEvent(EVENT_ALL_ACCESS,true,"MyEvent"); 
ResetEvent(hEvent);

示例程式碼:

複製程式碼

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全域性變數
HANDLE hEvent;  //定義事件控制代碼
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待物件為有訊號狀態
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待物件為有訊號狀態
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
    hEvent = CreateEvent(NULL, FALSE, TRUE, "event");
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

複製程式碼

執行結果都一樣就不來顯示出來了。

訊號量

訊號量是維護0到指定最大值之間的同步物件。訊號量狀態在其計數大於0時是有訊號的,而其計數是0時是無訊號的。訊號量物件在控制上可以支援有限數量共享資源的訪問。

訊號量的特點和用途可用下列幾句話定義:  (1)如果當前資源的數量大於0,則訊號量有效;  (2)如果當前資源數量是0,則訊號量無效;  (3)系統決不允許當前資源的數量為負值;  (4)當前資源數量決不能大於最大資源數量。

建立訊號量

函式原型為:

 HANDLE CreateSemaphore (
   PSECURITY_ATTRIBUTE psa, //訊號量的安全屬性
   LONG lInitialCount, //開始時可供使用的資源數
   LONG lMaximumCount, //最大資源數
   PCTSTR pszName);     //訊號量的名稱

釋放訊號量

通過呼叫ReleaseSemaphore函式,執行緒就能夠對信標的當前資源數量進行遞增,該函式原型為:

BOOL WINAPI ReleaseSemaphore(
   HANDLE hSemaphore,   //要增加的訊號量控制代碼
   LONG lReleaseCount, //訊號量的當前資源數增加lReleaseCount
   LPLONG lpPreviousCount  //增加前的數值返回
   );

開啟訊號量 

和其他核心物件一樣,訊號量也可以通過名字跨程序訪問,開啟訊號量的API為:

 HANDLE OpenSemaphore (
   DWORD fdwAccess,      //access
   BOOL bInherithandle,  //如果允許子程序繼承控制代碼,則設為TRUE
   PCTSTR pszName  //指定要開啟的物件的名字
  );

程式碼示例:

複製程式碼

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全域性變數
HANDLE hSemaphore;  //定義訊號量控制代碼
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待訊號量為有訊號狀態
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待訊號量為有訊號狀態
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
int main()
{
    hSemaphore = CreateSemaphore(NULL, 1, 100, "sema");
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

複製程式碼

結果一樣。

互斥量

採用互斥物件機制。 只有擁有互斥物件的執行緒才有訪問公共資源的許可權,因為互斥物件只有一個,所以能保證公共資源不會同時被多個執行緒訪問。互斥不僅能實現同一應用程式的公共資源安全共享,還能實現不同應用程式的公共資源安全共享。

程式碼示例:

複製程式碼

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全域性變數
HANDLE hMutex;  //定義互斥物件控制代碼
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
int main()
{
    hMutex = CreateMutex(NULL, false, "mutex");     //建立互斥物件
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

複製程式碼

結果一樣的。

三.多執行緒+IOPC實現服務端

服務端程式碼:

複製程式碼

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <process.h>
#include <winsock2.h>
#include <windows.h>
 
#pragma comment(lib,"ws2_32.lib");//載入ws2_32.dll
 
#define BUF_SIZE 100
#define READ    3
#define    WRITE    5
 
typedef struct    // socket info
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAdr;
} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
 
typedef struct    // buffer info
{
    OVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buffer[BUF_SIZE];
    int rwMode;    // READ or WRITE 讀寫模式
} PER_IO_DATA, *LPPER_IO_DATA;
 
unsigned int  WINAPI EchoThreadMain(LPVOID CompletionPortIO);
void ErrorHandling(char *message);
SOCKET ALLCLIENT[100];
int clientcount = 0;
HANDLE hMutex;//互斥量
 
int main(int argc, char* argv[])
{
 
    hMutex = CreateMutex(NULL, FALSE, NULL);//建立互斥量
 
    WSADATA    wsaData;
    HANDLE hComPort;
    SYSTEM_INFO sysInfo;
    LPPER_IO_DATA ioInfo;
    LPPER_HANDLE_DATA handleInfo;
 
    SOCKET hServSock;
    SOCKADDR_IN servAdr;
    int  i;
    DWORD recvBytes = 0,flags = 0;
 
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//建立CP物件
    GetSystemInfo(&sysInfo);//獲取當前系統的資訊
 
    for (i = 0; i < sysInfo.dwNumberOfProcessors; i++)
        _beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);//建立=CPU個數的執行緒數
 
    hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);//不是非阻塞套接字,但是重疊IO套接字。
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAdr.sin_port = htons(1234);
 
    bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr));
    listen(hServSock, 5);
 
    while (1)
    {
        SOCKET hClntSock;
        SOCKADDR_IN clntAdr;
        int addrLen = sizeof(clntAdr);
 
        hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen);
 
        handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));//和重疊IO一樣
        handleInfo->hClntSock = hClntSock;//儲存客戶端套接字
 
        WaitForSingleObject(hMutex, INFINITE);//執行緒同步
 
        ALLCLIENT[clientcount++] = hClntSock;//存入套接字佇列
 
        ReleaseMutex(hMutex);
 
        memcpy(&(handleInfo->clntAdr), &clntAdr, addrLen);
 
        CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);//連線套接字和CP物件
                                                                                //已完成資訊將寫入CP物件
        ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//儲存接收到的資訊
        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
        ioInfo->wsaBuf.len = BUF_SIZE;
        ioInfo->wsaBuf.buf = ioInfo->buffer;//和重疊IO一樣
        ioInfo->rwMode = READ;//讀寫模式
 
        WSARecv(handleInfo->hClntSock, &(ioInfo->wsaBuf),//非阻塞模式
            1, &recvBytes, &flags, &(ioInfo->overlapped), NULL);
    }
    CloseHandle(hMutex);//銷燬互斥量
    return 0;
}
 
unsigned int WINAPI EchoThreadMain(LPVOID pComPort)//執行緒的執行
{
    HANDLE hComPort = (HANDLE)pComPort;
    SOCKET sock;
    DWORD bytesTrans;
    LPPER_HANDLE_DATA handleInfo;
    LPPER_IO_DATA ioInfo;
    DWORD flags = 0;
 
    while (1)//大迴圈
    {
        GetQueuedCompletionStatus(hComPort, &bytesTrans,//確認“已完成”的I/O!!
            (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);//INFINITE使用時,程式將阻塞,直到已完成的I/O資訊寫入CP物件
        sock = handleInfo->hClntSock;//客戶端套接字
 
        if (ioInfo->rwMode == READ)//讀寫模式(此時緩衝區有資料)
        {
            puts("message received!");
            if (bytesTrans == 0)    // 連線結束
            {
                WaitForSingleObject(hMutex, INFINITE);//執行緒同步
 
                closesocket(sock);
                int i = 0;
                while (ALLCLIENT[i] == sock){ i++; }
                ALLCLIENT[i] = 0;//斷開置0
 
                ReleaseMutex(hMutex);
 
                free(handleInfo); free(ioInfo);
                continue;
            }
            int i = 0;
 
            for (; i < clientcount;i++)
            {
                if (ALLCLIENT[i] != 0)//判斷是否為已連線的套接字
                {
                    if (ALLCLIENT[i] != sock)
                    {
                        LPPER_IO_DATA newioInfo;
                        newioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//動態分配記憶體
                        memset(&(newioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        strcpy(newioInfo->buffer, ioInfo->buffer);//重新構建新的記憶體,防止多次釋放free
                        newioInfo->wsaBuf.buf = newioInfo->buffer;
                        newioInfo->wsaBuf.len = bytesTrans;
                        newioInfo->rwMode = WRITE;
 
                        WSASend(ALLCLIENT[i], &(newioInfo->wsaBuf),//回聲
                            1, NULL, 0, &(newioInfo->overlapped), NULL);
                    }
                    else
                    {
                        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        ioInfo->wsaBuf.len = bytesTrans;
                        ioInfo->rwMode = WRITE;
                        WSASend(ALLCLIENT[i], &(ioInfo->wsaBuf),//回聲
                            1, NULL, 0, &(ioInfo->overlapped), NULL);
                    }
                }
            }
            ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//動態分配記憶體
            memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
            ioInfo->wsaBuf.len = BUF_SIZE;
            ioInfo->wsaBuf.buf = ioInfo->buffer;
            ioInfo->rwMode = READ;
            WSARecv(sock, &(ioInfo->wsaBuf),//再非阻塞式接收
                1, NULL, &flags, &(ioInfo->overlapped), NULL);
        }
        else
        {
            puts("message sent!");
            free(ioInfo);
        }
    }
    return 0;
}
 
void ErrorHandling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

複製程式碼

客戶端:

複製程式碼

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <process.h> 
#define BUF_SIZE 1000
#define NAME_SIZE 20
 
#pragma comment(lib, "ws2_32.lib")  //載入 ws2_32.dll  
 
unsigned WINAPI SendMsg(void * arg);//傳送資訊函式
unsigned WINAPI RecvMsg(void * arg);//接受資訊函式
void ErrorHandling(char * msg);//錯誤返回函式
 
int haveread = 0;
char NAME[50];//[名字]
char ANAME[50];
char msg[BUF_SIZE];//資訊
 
int main(int argc, char *argv[])
{
 
    printf("請輸入網名:");
    scanf("%s", NAME);
    WSADATA wsaData;
    SOCKET hSock;
    SOCKADDR_IN servAdr;
    HANDLE hSndThread, hRcvThread;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hSock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = inet_addr("127.0.0.1");
    servAdr.sin_port = htons(1234);
 
    if (connect(hSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
        ErrorHandling("connect() error");
 
    int resultsend;
    puts("Welcome to joining our chatting room!\n");
    sprintf(ANAME, "[%s]", NAME);
 
    hSndThread =
        (HANDLE)_beginthreadex(NULL, 0, SendMsg, (void*)&hSock, 0, NULL);//寫執行緒
    hRcvThread =
        (HANDLE)_beginthreadex(NULL, 0, RecvMsg, (void*)&hSock, 0, NULL);//讀執行緒
 
    WaitForSingleObject(hSndThread, INFINITE);//等待執行緒結束
    WaitForSingleObject(hRcvThread, INFINITE);
    closesocket(hSock);
    WSACleanup();
    system("pause");
    return 0;
}
 
unsigned WINAPI SendMsg(void * arg)   // send thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    char padd[2];
    fgets(padd, 2, stdin);//多餘的'\n'
    printf("\n send message:");
    while (1)
    {
        {
            fgets(msg, BUF_SIZE, stdin);
            if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
            {
                closesocket(sock);
                exit(0);
            }
            sprintf(name_msg, "[%s] %s", NAME, msg);
            char numofmsg = strlen(name_msg) + '0';
            char newmsg[100]; newmsg[0] = numofmsg; newmsg[1] = 0;//第一個字元表示訊息的長度
            strcat(newmsg, name_msg);
            int result = send(sock, newmsg, strlen(newmsg), 0);
            if (result == -1)return -1;//傳送錯誤
        }
    }
    return NULL;
}
 
unsigned WINAPI RecvMsg(void * arg)  // read thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len = 0;
    while (1)
    {
        {
            char lyfstr[1000] = { 0 };
            int totalnum = 0;
            str_len = recv(sock, name_msg, 1, 0);//讀取第一個字元!獲取訊息的長度
            if (str_len == -1)//讀取錯誤
            {
                printf("return -1\n");
                return -1;
            }
            if (str_len == 0)//讀取結束
            {
                printf("return 0\n");
                return 0;//讀取結束
            }
            totalnum = name_msg[0] - '0';
            int count = 0;
 
            do
            {
                str_len = recv(sock, name_msg, 1, 0);
 
                name_msg[str_len] = 0;
 
                if (str_len == -1)//讀取錯誤
                {
                    printf("return -1\n");
                    return -1;
                }
                if (str_len == 0)
                {
                    printf("return 0\n");
                    return 0;//讀取結束
                }
                strcat(lyfstr, name_msg);
                count = str_len + count;
 
            } while (count < totalnum);
 
            lyfstr[count] = '\0';
            printf("\n");
            strcat(lyfstr, "\n");
            fputs(lyfstr, stdout);
            printf(" send message:");
            fflush(stdout);
            memset(name_msg, 0, sizeof(char));
        }
    }
    return NULL;
}
 
void ErrorHandling(char * msg)
{
    fputs(msg, stderr);
    fputc('\n', stderr);
    exit(1);
}

複製程式碼

 最後說一句啦。本網路程式設計入門系列部落格是連載學習的,有興趣的可以看我部落格其他篇。。。。

參考部落格:https://blog.csdn.net/kaida1234/article/details/79465713

參考部落格:http://www.runoob.com/cplusplus/cpp-multithreading.html

參考部落格:https://blog.csdn.net/u010223072/article/details/49335867

參考部落格:https://blog.csdn.net/wxf2012301351/article/details/73504281

參考書籍:《TCP/IP網路程式設計 ---尹聖雨》