1. 程式人生 > >使用c語言實現執行緒池以及執行緒池原理

使用c語言實現執行緒池以及執行緒池原理

執行緒池介紹

  • 執行緒池允許一個執行緒可以多次複用,且每次複用的執行緒內部的訊息處理可以不相同,將建立與銷燬的開銷省去而不必來一個請求開一個執行緒;簡單來說就是有一堆已經建立好的執行緒(最大數目一定),初始時他們都處於空閒狀態,當有新的任務進來,從執行緒池中取出一個空閒的執行緒處理任務,然後當任務處理完成之後,該執行緒被重新放回到執行緒池中,供其他的任務使用,當執行緒池中的執行緒都在處理任務時,就沒有空閒執行緒供使用,此時,若有新的任務產生,只能等待執行緒池中有執行緒結束任務空閒才能執行,下面是執行緒池的工作原理圖:

應用場景

執行緒池是一種多執行緒處理形式,大多用於高併發伺服器上,它能合理有效的利用高併發伺服器上的執行緒資源;當我們的通訊範圍擴大到廣域網或大型區域網通訊中時,我們將面臨大量訊息頻繁請求伺服器;在這種情況下,建立與銷燬執行緒都已經成為一種奢侈的開銷,特別對於嵌入式伺服器來說更應保證記憶體資源的合理利用;

執行緒池結構

一:執行緒池結構體

typedef struct task
{
    void *(*run)(void *args);  //函式指標,需要執行的任務
    void *arg;              //引數
    struct task *next;      //任務佇列中下一個任務
}task_t;


//下面是執行緒池結構體
typedef struct threadpool
{
    condition_t ready;    //狀態量
    task_t *first;       //任務佇列中第一個任務
    task_t *last;        //任務佇列中最後一個任務
int counter; //執行緒池中已有執行緒數 int idle; //執行緒池中空閒執行緒數 int max_threads; //執行緒池最大執行緒數 int quit; //是否退出標誌 }threadpool_t;

狀態量:狀態量就是互斥鎖與條件變數,在多執行緒環境中,執行緒池就相當於臨界資源,所以就需要條件變數和互斥鎖來保證每一個執行緒訪問臨界資源的安全高效。
first/last指標:就是執行緒池中維護的任務佇列,在使用執行緒池時,只需要向佇列中新增任務就行,執行緒池中的空閒執行緒會”主動“執行任務。
max_threads:

執行緒池最大執行緒數,如果向任務佇列中新增新任務,此時,執行緒池中並沒有空閒執行緒來執行任務,就需要建立新的執行緒執行,但是,如果此時已有執行緒數已經達到執行緒池的最大執行緒數,那麼就會失敗返回。

具體程式碼實現:

用條件變數和互斥鎖封裝的狀態,用來保證執行緒池的安全

condition.h

#ifndef _CONDITION_H_                                                                                                                                          
#define _CONDITION_H_

#include <pthread.h>

//封裝一個互斥量和條件變數作為狀態
typedef struct condition
{
    pthread_mutex_t pmutex;
    pthread_cond_t pcond;
}condition_t;

//對狀態的操作函式
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t* cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);

#endif

condition.c

#include "condition.h"                                                                                                                                                                                                                        

//初始化
int condition_init(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;

    if((status = pthread_cond_init(&cond->pcond, NULL)))
        return status;

    return 0;
}

//加鎖
int condition_lock(condition_t *cond)
{
    return pthread_mutex_lock(&cond->pmutex);
}

//解鎖
int condition_unlock(condition_t *cond)
{
    return pthread_mutex_unlock(&cond->pmutex);
}

//等待
int condition_wait(condition_t *cond)
{
    return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

//固定時間等待
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
    return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

//喚醒一個睡眠執行緒
int condition_signal(condition_t* cond)
{
    return pthread_cond_signal(&cond->pcond);
}

//喚醒所有睡眠執行緒
int condition_broadcast(condition_t *cond)
{
    return pthread_cond_broadcast(&cond->pcond);
}

//銷燬互斥鎖和條件變數
int condition_destroy(condition_t *cond)
{
    int status;
    if((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;

    if((status = pthread_cond_destroy(&cond->pcond)))
        return status;

    return 0;
}

執行緒池threadpool.h

#ifndef _THREAD_POOL_H_                                                                                                                                                                                                                       
#define _THREAD_POOL_H_

//需要引入狀態的標頭檔案--條件變數和互斥鎖
#include "condition.h"

//封裝執行緒池中的物件需要執行的任務物件
typedef struct task
{
    void *(*run)(void *args);  //函式指標,需要執行的任務
    void *arg;              //引數
    struct task *next;      //任務佇列中下一個任務
}task_t;


//下面是執行緒池結構體
typedef struct threadpool
{
    condition_t ready;    //狀態量
    task_t *first;       //任務佇列中第一個任務
    task_t *last;        //任務佇列中最後一個任務
    int counter;         //執行緒池中已有執行緒數
    int idle;            //執行緒池中空閒執行緒數
    int max_threads;     //執行緒池最大執行緒數
    int quit;            //是否退出標誌 1/0
}threadpool_t;


//執行緒池初始化
void threadpool_init(threadpool_t *pool, int idle_threads, int max_threads);

//往執行緒池中加入任務
int threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);

//摧毀執行緒池
void threadpool_destroy(threadpool_t *pool);

#endif

threadpool.c

#include "threadpool.h"                                                                                                                                                                                                                       
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <time.h>

//執行緒池中建立的執行緒執行
void *thread_routine(void *arg)
{
    struct timespec abstime;//時間結構體
    int timeout;
    printf("thread %d is starting\n", (int)pthread_self());
    threadpool_t *pool = (threadpool_t *)arg;

    //死迴圈使執行緒池中空閒的執行緒可以複用
    while(1)
    {   
        timeout = 0;

        //訪問執行緒池之前需要加鎖
        condition_lock(&pool->ready);

        //空閒
        pool->idle++;

        //任務佇列沒有任務到來並且沒有收到執行緒池銷燬通知, 執行緒阻塞等待(進入這裡面都是空閒執行緒,等待被喚醒)
        while(pool->first == NULL && !pool->quit)
        {   
            printf("thread %d is waiting\n", (int)pthread_self());

            //獲取當前時間,並加上等待時間, 從而設定程序的超時睡眠時間
            clock_gettime(CLOCK_REALTIME, &abstime);  
            abstime.tv_sec += 2;
            int status;
            status = condition_timedwait(&pool->ready, &abstime);  //該函式會解鎖,允許其他執行緒訪問,當被喚醒時,加鎖
            if(status == ETIMEDOUT)
            {   
                printf("thread %d wait timed out\n", (int)pthread_self());
                timeout = 1;
                break;
            }   
        }   

        pool->idle--;
        if(pool->first != NULL)
        {   
            //取出任務佇列最前的任務,移除任務,並執行任務
            task_t *t = pool->first;
            pool->first = t->next;
            //由於任務執行需要消耗時間,先解鎖讓其他執行緒訪問執行緒池
            condition_unlock(&pool->ready);
            //執行任務
            t->run(t->arg);
            //執行完任務釋放記憶體
            free(t);
            //重新加鎖
            condition_lock(&pool->ready);
        }   

        //退出執行緒池--銷燬當前執行緒
        if(pool->quit && pool->first == NULL)
        {
            pool->counter--;//當前工作的執行緒數-1

            //若執行緒池中沒有執行緒,喚醒等待執行緒(主執行緒--銷燬執行緒池的執行緒)全部任務已經完成
            if(pool->counter == 0)
            {
                condition_signal(&pool->ready);
            }
            condition_unlock(&pool->ready);
            break;
        }
        //超時,說明執行緒沒有任務可以執行, 跳出銷燬執行緒
        if(timeout == 1)
        {
            pool->counter--;//當前工作的執行緒數-1
            condition_unlock(&pool->ready);
            break;
        }

        condition_unlock(&pool->ready);
    }

    printf("thread %d is exiting\n", (int)pthread_self());
    return NULL;

}


//執行緒池初始化
void threadpool_init(threadpool_t *pool, int idle_threads, int max_threads)
{

    condition_init(&pool->ready);
    pool->first = NULL;
    pool->last =NULL;
    pool->counter =0;
    pool->idle =0;
    pool->max_threads = max_threads;
    pool->quit =0;

    //建立空閒執行緒
   int i = 0;
   for(; i < idle_threads; i++)
   {
       pthread_t tid;
        pthread_create(&tid, NULL, thread_routine, pool);
        pool->counter++;//已有執行緒數+1
   }

}
//增加一個任務到執行緒池
int threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
    //產生一個新的任務
    task_t *newtask = (task_t *)malloc(sizeof(task_t));
    newtask->run = run;
    newtask->arg = arg;
    newtask->next=NULL;//新加的任務放在佇列尾端

    //執行緒池的狀態被多個執行緒共享,操作前需要加鎖
    condition_lock(&pool->ready);

    if(pool->first == NULL)//第一個任務加入
    {
        pool->first = newtask;
    }        
    else    
    {
        pool->last->next = newtask;
    }
    pool->last = newtask;  //佇列尾指向新加入的執行緒

    //執行緒池中有執行緒空閒,喚醒處於等待狀態的執行緒(因為在等待期間會釋放互斥鎖)
    if(pool->idle > 0)
    {
        condition_signal(&pool->ready);
    }
    //當前執行緒池中執行緒個數沒有達到設定的最大值,建立一個新的執行緒
    else if(pool->counter < pool->max_threads)
    {
        pthread_t tid;
        pthread_create(&tid, NULL, thread_routine, pool);
        pool->counter++;
    }
    else
    {
        condition_unlock(&pool->ready);
        return -1;
    }
    //結束,訪問
    condition_unlock(&pool->ready);
    return 0;
}

//執行緒池銷燬
void threadpool_destroy(threadpool_t *pool)
{
    //如果已經呼叫銷燬,直接返回
    if(pool->quit)
    {
        return;
    }
    //加鎖
    condition_lock(&pool->ready);                                                                                                                                                                                                             
    //設定銷燬標記為1
    pool->quit = 1;
    //執行緒池中執行緒個數大於0
  if(pool->counter > 0)
    {
        //對於等待的執行緒,傳送訊號喚醒
        if(pool->idle > 0)
        {
            condition_broadcast(&pool->ready);
        }
        //正在執行任務的執行緒,等待他們結束任務
        while(pool->counter)
        {
            condition_wait(&pool->ready);
        }
    }
    condition_unlock(&pool->ready);
    condition_destroy(&pool->ready);
}                    

測試程式碼test.c

#include <stdio.h>                                                                                                                                                                                                                          
  #include "threadpool.h"
  #include <unistd.h>
  #include <stdlib.h>
  #include <stdio.h>

  void* mytask(void *arg)
  {
      printf("thread %d is working on task %d\n", (int)pthread_self(),(int)arg);
      sleep(1);
      return NULL;

  }
  //測試程式碼
  int main(void)
  {
      threadpool_t pool;
      //初始化執行緒池,建立2個空閒執行緒,最多5個執行緒
      threadpool_init(&pool, 2, 5); 
      int i;
      //建立十個任務
      for(i=0; i < 10; i++)
      {   
       threadpool_add_task(&pool, mytask, (void*)i);

      }   
      threadpool_destroy(&pool);
      return 0;
  }
仔細研究下測試程式碼的執行流程

這裡寫圖片描述

相關推薦

C語言實現printf函式,即引數可變函式原理

我們在C語言程式設計中會遇到一些引數個數可變的函式,例如printf() 這個函式,它的定義是這樣的: int printf( const char* format, ...); 它除了有一個引數format固定以外,後面跟的引數的個數和型別是 可變的,例如我們可以有以下不

使用c語言實現執行以及執行原理

執行緒池介紹 執行緒池允許一個執行緒可以多次複用,且每次複用的執行緒內部的訊息處理可以不相同,將建立與銷燬的開銷省去而不必來一個請求開一個執行緒;簡單來說就是有一堆已經建立好的執行緒(最大數目一定),初始時他們都處於空閒狀態,當有新的任務進來,從執行緒池中取

執行原理C語言實現執行

備註:該執行緒池原始碼參考自傳直播客培訓視訊配套資料; 原始碼:https://pan.baidu.com/s/1zWuoE3q0KT5TUjmPKTb1lw 密碼:pp42 引言:執行緒池是一種多執行緒處理形式,大多用於高併發伺服器上,它能合理有效的利用高

C語言實現串列埠通訊知識點整理(四)】關於執行和程序

轉載:https://www.cnblogs.com/fuchongjundream/p/3829508.html 因為在外部檔案中呼叫結構體沒有用extern修飾,導致獲取不到正確的值,一直糾結線上程上。現在大概總結執行緒和程序的特點: 概念 1、程序(process) 狹義定義:

C語言實現串列埠通訊知識點整理(一)】執行、開啟串列埠、設定波特率、設定校驗位、互斥鎖等實現基本的通訊

  部分程式碼借鑑地址:https://blog.csdn.net/wangqingchuan92/article/details/73497354/ 謝謝! 1.建立執行緒線上程內進行串列埠之間的收發 void CREAT_pthread(void) { pthr

c++11 實現半同步半非同步執行

感受: 隨著深入學習,現代c++給我帶來越來越多的驚喜… c++真的變強大了。 半同步半非同步執行緒池: 其實很好理解,分為三層 同步層:通過IO複用或者其他多執行緒多程序等不斷的將待處理事件新增到佇列中,這個過程是同步進行的

C語言實現的多執行定時器

[toc](c語言製作定時器庫) *** ## 1. 大致功能介紹 - 實現任務列表,定時器會間隔一段時間遍歷列表發現要執行的任務 - 任務列表中的所有任務並行執行 - 每個任務都可以有自己的定時器,並且可以選擇是否要重複執行 - 定義方便的任務函式實現介面 - 定時器可以由使用者自定義何時啟動和停止 - 提

C語言實現大數相加(思路+程式碼+執行結果)

大數相加 思路: 1.先將字串倒序並轉換為數字 2.逐位相加,並存入一個數組e[i]中 3.將得到的結果進行進位處理 4.轉換並把陣列e[i]反轉,迴圈輸出結果 #include<iostrea

C語言實現線性表歸併_含原始碼和執行結果_資料結構(C語言版)

採用動態分配順序儲存結構實現 採用單鏈表結構實現 1.採用動態分配實現 #include<stdio.h> #include<stdlib.h> #define LIST_INIT_SIZE 100 #define LISTINCREM

JAVA執行--Executors之什麼是執行,為什麼使用執行以及執行的使用

1. 為什麼需要執行緒池?      多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。              假設一個伺服器完成一項任務所需時間為:T1 建立執行緒時間,T2 線上程中執行任務的時間,T

C語言實現程式跳轉到絕對地址0x100000處執行

嵌入式筆試題:想讓程式跳轉到絕對地址0x100000處執行,該如何做? 網上看到有如下答案: *((void(*)(void))0x100000)(); 經過在VC++6.0和LINUX gcc4.4.3下測試,均不能通過編譯。 VC++6.0報錯:error C2100:

c語言實現漢諾塔(程式執行步驟詳解)

很久沒去接觸c語言了,今天翻了翻c語言的書,偶然間看到了大一時讓我鬱悶了很久的漢諾塔問題,又重新推理了一遍,漢諾塔的實現採用遞迴演算法,涉及到資料結構中的棧的知識。下面是c實現漢諾塔的原始碼。程式只是實現了文字資訊,即用文字描述了圓盤的移動過程,並未真正實現圓盤的移動,該程式

Linux下C語言實現C/S模式程式設計(附原始碼,執行截圖)

由標題可知,這篇部落格主要講如何用C語言實現一個C/S模式的程式。 主要功能:時間回送。 客戶機發出請求,伺服器響應時間,並返回伺服器時間,與客戶機進行同步。 廢話不多說,下面直接貼出原始碼。 程式碼如下: #include <stdio.h> #include

Linux C語言程式設計(十五)——程序、執行與訊號

1、程序 1.1 基本概念 每個程序在核心中都有一個程序控制塊( PCB)來維護程序相關的資訊, Linux核心的程序控制塊是task_struct結構體。 程序ID:統中每個程序有唯一的id,在C語言中用pid_t型別表示,其實就是一個非負整數。 程序狀態:有執行、掛起、

C執行程式設計以及執行函式

執行緒的資料處理   和程序相比,執行緒的最大優點之一是資料的共享性,各個程序共享父程序處沿襲的資料段,可以方便的獲得、修改資料。但這也給多執行緒程式設計帶來了許多問題。我們必須當心有多個不同的程序訪問相同的變數。許多函式是不可重入的,即同時不能執行一個函式的多個拷貝(除非使用不同的資料段)。在函式中宣告的

在STM32上實現NTFS之5:GPT分區表的C語言實現(2)GPT實現以及統一方式讀取磁盤分區

tfs 下載 數據 特殊 dpt 屬性列表 handle 系統分區 成了   上一節實現了主GPT頭的信息提取,這一節繼續提取整個的GPT數據,並且將GPT分區表和MBR分區表兩種格式融合成一個模塊,使主調函數(也可以說是使用者)不需要關心磁盤的分區表類型:它太底層了,確實

資料結構之連結串列C語言實現以及使用場景分析

連結串列是資料結構中比較基礎也是比較重要的型別之一,那麼有了陣列,為什麼我們還需要連結串列呢!或者說設計連結串列這種資料結構的初衷在哪裡? 這是因為,在我們使用陣列的時候,需要預先設定目標群體的個數,也即陣列容量的大小,然而實時情況下我們目標的個數我們是不確定的,因此我們總是要把陣列的容量設定的

關於中值濾波演算法,以及C語言實現

關於中值濾波演算法,以及C語言實現 2017年04月06日 11:45:58 閱讀數:1464 1、什麼是中值濾波? 中值濾波是對一個滑動視窗內的諸畫素灰度值排序,用其中值代替視窗中心象素的原來灰度值,它是一種非線性的影象平滑法,它對脈衝干擾級椒鹽噪聲的抑制效果好,在抑制隨機噪聲的同

軟體素材---linux C語言:linux下獲取可執行檔案的絕對路徑--getcwd函式

      //標頭檔案:#include <unistd.h>     //定義函式:char * getcwd(char * buf, size_t size);    

[領卓教育]用C語言實現ls以及ls-功能

各位程式設計師在自己的虛擬機器裡一定沒少執行過“ls”這個功能吧,這個程式碼就是實現了ls和ls-l功能,話不多說,上程式碼。 實現程式碼 int process_ls(char * path) { DIR * dirp; struct dire