1. 程式人生 > >程序池和執行緒池

程序池和執行緒池

用new、malloc申請記憶體時,由於每次申請的大小不同,最後可能導致會有許多記憶體碎片無法使用,造成記憶體浪費和不好管理的問題。記憶體池則是在真正使用記憶體之前,先申請分配一定數量的、大小相等的記憶體塊留做備用。當有新的記憶體需求時,就從記憶體池中分出一部分記憶體塊,若記憶體塊不夠則繼續申請新的記憶體。這樣做有效的避免了記憶體碎片,提高了記憶體使用效率。

而程序池(執行緒池與程序池的概念基本一致)就引用了記憶體池的思想,由伺服器預先建立一組程序,程序池中所有子程序都執行相同的程式碼,擁有相同的屬性,如:優先順序、PGID(程序組ID)。

當有新的任務來到時,主程序將通過某種方式選擇程序池中的某一個子程序來為之服務。相比於動態建立子程序,選擇一個已經存在的子程序的代價顯得小得多。至於主程序選擇哪個子程序來為新任務服務,則有兩種方法:

主程序使用某種演算法來主動選擇子程序。最簡單、最常用的演算法是隨機演算法和 Round Robin (輪流演算法)。

  1. 主程序和所有子程序通過一個共享的工作佇列來同步,子程序都睡眠在該工作佇列上。當有新的任務到來時,主程序將任務新增到工作佇列中。這將喚醒正在等待任務的子程序,不過只有一個子程序將獲得新任務的“接管權”,它可以從工作佇列中取出任務並執行之,而其他子程序將繼續睡眠在工作佇列上。
  2. 當選擇好子程序後,主程序還需要使用某種通知機制來告訴目標子程序有新任務需要處理,並傳遞必要的資料。最簡單的方式是,在父程序和子程序之間預先建立好一條管道,然後通過管道來實現所有的程序間通訊。在父執行緒和子執行緒之間傳遞資料就要簡單得多,因為我們可以把這些資料定義為全域性,那麼它們本身就是被所有執行緒共享的。

在使用程序池處理多客戶任務時,首先考慮的一個問題是:監聽socket和連線socket是否都由主程序來統一管理。併發模型,其中半同步/半反應堆模式是由主程序統一管理這兩種socket的。而高效的半同步/半非同步和領導者/追隨者模式,則是由主程序管理所有監聽socket,而各個子程序分別管理屬於自己的連線socket的。對於前一種情況,主程序接受新的連線以得到連線socket,然後它需要將該socket傳遞給子程序(對於執行緒池而言,父執行緒將socket傳遞給子執行緒是很簡單的。因為他們可以很容易地共享該socket。但對於程序池而言,必須通過管道傳輸)。後一種情況的靈活性更大一些,因為子程序可以自己呼叫accept來接受新的連線,這樣該父程序就無須向子程序傳遞socket。而只需要簡單地通知一聲:“我檢測到新的連線,你來接受它。

常連線,即一個客戶的多次請求可以複用一個TCP連線。那麼,在設計程序池時還需要考慮:一個客戶連線上的所有任務是否始終由一個子程序來處理。如果說客戶任務是無狀態的,那麼我們可以考慮使用不同的程序為該客戶不同請求服務。

但如果客戶任務是存在上下文關係的,則最好一直用同一個程序來為之服務,否則實現起來比較麻煩,因為我們不得不在各個子程序傳遞上下文資料,我們採用epoll的EPOLLONESHOT事件,這一事件能夠確保一個客戶連線在整個生命週期中僅被一個執行緒處理。

這裡寫圖片描述
執行緒池程式碼:

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <pthread.h>  
#include <assert.h>  

/* 執行緒池裡所有執行和等待的任務都是一個CThread_worker
 * 由於所有任務都在連結串列裡,所以是一個連結串列結構 
*/  
typedef struct worker  
{
    /*回撥函式,任務執行時會呼叫此函式,注意也可宣告成其它形式*/  
    void *(*process) (void *arg);  
    void *arg;/*回撥函式的引數*/  
    struct worker *next;                
} CThread_worker;  
/*執行緒池結構*/  
typedef struct  
{
    pthread_mutex_t queue_lock;  
    pthread_cond_t queue_ready;  
    /*連結串列結構,執行緒池中所有等待任務*/  
    CThread_worker *queue_head;  
    /*是否銷燬執行緒池*/  
    int shutdown;  
    pthread_t *threadid;  
    /*執行緒池中允許的活動執行緒數目*/  
    int max_thread_num;  
    /*當前等待佇列的任務數目*/  
    int cur_queue_size;  
} CThread_pool;  
int pool_add_worker (void *(*process) (void *arg), void *arg);  
void *thread_routine (void *arg);  
//share resource  
static CThread_pool *pool = NULL;  
void  
pool_init (int max_thread_num)  
{
    pool = (CThread_pool *) malloc (sizeof (CThread_pool));          
    pthread_mutex_init (&(pool->queue_lock), NULL);  
    pthread_cond_init (&(pool->queue_ready), NULL);  
    pool->queue_head = NULL;                      
    pool->max_thread_num = max_thread_num;  
    pool->cur_queue_size = 0;  
    pool->shutdown = 0;  
    pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  
    int i = 0;  
    for (i = 0; i < max_thread_num; i++)  
    {
        pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);
    }  
}  
/*向執行緒池中加入任務*/  
int pool_add_worker (void *(*process) (void *arg), void *arg)  
{
    /*構造一個新任務*/  
    CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  
    newworker->process = process;
    newworker->arg = arg;  
    newworker->next = NULL;/*別忘置空*/
    pthread_mutex_lock (&(pool->queue_lock));  
    /*將任務加入到等待佇列中*/  
    CThread_worker *member = pool->queue_head;  
    if (member != NULL)  
    {
        while (member->next != NULL)  
            member = member->next;  
        member->next = newworker;  
    }  
    else  
    {
        pool->queue_head = newworker;  
    }  
    assert (pool->queue_head != NULL);  
    pool->cur_queue_size++;  
    pthread_mutex_unlock (&(pool->queue_lock));  
    /*好了,等待佇列中有任務了,喚醒一個等待執行緒; 
     * 注意如果所有執行緒都在忙碌,這句沒有任何作用*/  
    pthread_cond_signal (&(pool->queue_ready));  
    return 0;  
}
/*銷燬執行緒池,等待佇列中的任務不會再被執行,但是正在執行的執行緒會一直 
 * 把任務執行完後再退出*/  
int pool_destroy ()  
{
    if (pool->shutdown)  
        return -1;/*防止兩次呼叫*/  
    pool->shutdown = 1; 
    /*喚醒所有等待執行緒,執行緒池要銷燬了*/  
    pthread_cond_broadcast (&(pool->queue_ready));  
    /*阻塞等待執行緒退出,否則就成殭屍了*/  
    int i;  
    for (i = 0; i < pool->max_thread_num; i++)  
        pthread_join (pool->threadid[i], NULL);  
    free (pool->threadid);  
    /*銷燬等待佇列*/  
    CThread_worker *head = NULL;  
    while (pool->queue_head != NULL)  
    {
        head = pool->queue_head;
        pool->queue_head = pool->queue_head->next;
        free (head);
    }  
    /*條件變數和互斥量也別忘了銷燬*/  
    pthread_mutex_destroy(&(pool->queue_lock));  
    pthread_cond_destroy(&(pool->queue_ready));  
    free (pool);  
    /*銷燬後指標置空是個好習慣*/  
    pool=NULL;  
    return 0;  
}  
void *thread_routine (void *arg)  
{
    printf ("starting thread 0x%x\n", pthread_self ());  
    while (1)  
    {
        pthread_mutex_lock (&(pool->queue_lock));  
        /*如果等待佇列為0並且不銷燬執行緒池,則處於阻塞狀態; 注意 
         * pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖*/
        while (pool->cur_queue_size == 0 && !pool->shutdown)  
        {
            printf ("thread 0x%x is waiting\n", pthread_self ());  
            pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
        }                                                           
        /*執行緒池要銷燬了*/  
        if (pool->shutdown)  
        {
            /*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/  
            pthread_mutex_unlock (&(pool->queue_lock));  
            printf ("thread 0x%x will exit\n", pthread_self ());  
            pthread_exit (NULL);  
        }  
        printf ("thread 0x%x is starting to work\n", pthread_self ());  
        /*assert是除錯的好幫手*/  
        assert (pool->cur_queue_size != 0);  
        assert (pool->queue_head != NULL);  
        /*等待佇列長度減去1,並取出連結串列中的頭元素*/  
        pool->cur_queue_size--; 
        CThread_worker *worker = pool->queue_head;  
        pool->queue_head = worker->next;  
        pthread_mutex_unlock (&(pool->queue_lock));
        /*呼叫回撥函式,執行任務*/  
        (*(worker->process)) (worker->arg);  
        free (worker);  
        worker = NULL;  
    }  
    /*這一句應該是不可達的*/  
    pthread_exit (NULL);  
}  
//    下面是測試程式碼  
void *myprocess (void *arg)  
{
    printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  
    sleep (1);/*休息一秒,延長任務的執行時間*/  
    return NULL;  
}  
int main (int argc, char **argv)  
{
    pool_init (3);/*執行緒池中最多三個活動執行緒*/  
    /*連續向池中投入10個任務*/
    int *workingnum = (int *) malloc (sizeof (int) * 10);
    int i;  
    for (i = 0; i < 10; i++)  
    {
        workingnum[i] = i;
        pool_add_worker (myprocess, &workingnum[i]);  
    }  
    /*等待所有任務完成*/  
    sleep (5);  
    /*銷燬執行緒池*/  
    pool_destroy ();  
    free (workingnum);  
    return 0;  
}

這裡寫圖片描述

相關推薦

程序執行實現高併發伺服器

思想:   1.  建立與伺服器相同cpu個數的程序個數來監聽(accept)客戶端響應,並在每個程序中先建立好一個執行緒池   2.  有客戶端訪問時,解除其中一個程序的accpet阻塞,進入到執行緒中來執行接收資料工作( recv() ),用執行緒防止recv阻塞,執行緒呼叫recv方法   3. 

CIL鎖,GIL與執行的區別,程序執行,同步與非同步

一.GIL鎖 什麼是GIL? 全域性直譯器鎖,是加在直譯器上的互斥鎖 GC是python自帶的記憶體管理機制,GC的工作原理:python中的記憶體管理使用的是應用計數,每個數會被加上一個整型的計數器,表示這個資料被引用的次數,當這個整數變為0時則表示該資料已經沒有人使用,成為了垃圾資料,當記憶體佔用達到

程序執行對比

程序池是主程式結束就自動結束,map是自帶close方法和join方法 (執行(程序或執行緒)結束才會繼續向後執行) 執行緒池是主程序結束不影響執行緒池   程序池: 程序的建立和銷燬是很有消耗的,影響程式碼執行效率   map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶c

程序執行

用new、malloc申請記憶體時,由於每次申請的大小不同,最後可能導致會有許多記憶體碎片無法使用,造成記憶體浪費和不好管理的問題。記憶體池則是在真正使用記憶體之前,先申請分配一定數量的、大小相等的記憶體塊留做備用。當有新的記憶體需求時,就從記憶體池中分出一部分

資料庫連線執行

1、資料庫連線池 資料庫連線池負責分配、管理和釋放資料庫連線,它允許應用程式重複使用一個現有的資料庫連線,而不是再重新建立一個;釋放空閒時間超過最大空閒時間的資料庫連線來避免因為沒有釋放資料庫連線而引起的資料庫連線遺漏。這項技術能明顯提高對資料庫操作的效能。  2、好處 1

python3下multiprocessing、threadinggevent效能對比----暨程序執行協程效能對比

        目前計算機程式一般會遇到兩類I/O:硬碟I/O和網路I/O。我就針對網路I/O的場景分析下python3下程序、執行緒、協程效率的對比。程序採用multiprocessing.Pool程序池,執行緒是自己封裝的程序池,協程採用gevent的庫。用python

記憶體程序執行的比較分析

池的概念 由於伺服器的硬體資源“充裕”,那麼提高伺服器效能的一個很直接的方法就是以空間換時間,即“浪費”伺服器的硬體資源,以換取其執行效率。這就是池的概念。池是一組資源的集合,這組資源在伺服器啟動之初就完全被建立並初始化,這稱為靜態資源分配。當伺服器進入正式執行階段,即開始

10-多執行、多程序執行程式設計

一、多執行緒、多程序和執行緒池程式設計 1.1、Python中的GIL鎖   CPython中,global interpreter lock(簡稱GIL)是一個互斥體,用於保護對Python物件的訪問,從而防止多個執行緒一次執行Python位元組碼(也就是說,GIL鎖每次只能允許一個執行緒工作,無法多個執行

java:記憶體程序執行

記憶體池: 自定義記憶體池的思想通過這個"池"字表露無疑,應用程式可以通過系統的記憶體分配呼叫預先一次性申請適當大小的記憶體作為一個記憶體池,之後應用程式自己對記憶體的分配和釋放則可以通過這個記憶體池來完成。 只有當記憶體池大小需要動態擴充套件時,才需要再呼叫系統的記憶體分配函式,其他時間對

第三十八天 GIL 程序執行

今日內容: 1.GIL 全域性直譯器鎖 2.Cpython直譯器併發效率驗證 3.執行緒互斥鎖和GIL對比 4.程序池與執行緒池 一.全域性直譯器鎖   1.GIL:全域性直譯器鎖     GIL本質就是一把互斥鎖,是夾在直譯器身上的     統一程序內的所有執行緒都需要先搶到GIL鎖,才能執

執行篇---Task(任務)執行不得不說的祕密

整理自部落格園一大佬的文章 :https://www.cnblogs.com/tuyile006/p/7154924.html 和碼農之家一匿名大佬 https://www.e-learn.cn/content/net/1114080 和部落格園大佬 https://www.cnblogs.

Java——多執行基本使用(四) 執行執行的使用,工廠設計模式的使用

1.執行緒組的概述和使用 Java中使用ThreadGroup來表示執行緒組,它可以對一批執行緒進行分類管理,Java允許程式直接對執行緒組進行控制。            &n

詳解Tomcat連線數執行

轉載至http://www.importnew.com/27309.html 前言 在使用tomcat時,經常會遇到連線數、執行緒數之類的配置問題,要真正理解這些概念,必須先了解Tomcat的聯結器(Connector)。 在前面的文章 詳解Tomcat配置檔案server.xml 中寫

什麼是執行執行的工作原理使用執行的好處

一個執行緒池管理了一組工作執行緒,同時它還包括了一個用於放置等待執行任務的任務佇列(阻塞佇列) 預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0.當任務提交給執行緒池之後的處理策略如下: 1:如果此時執行緒池中的數量小於corePoolSize(核心池的大小),即

10.執行執行的區別,執行有哪些,什麼情況下使用

一:執行緒和執行緒池的區別 (1)new Thread 的弊端       a. 每次new Thread時,新建物件效能差。       b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,可能佔用過多系統資源導致宕機或oom。       c. 缺乏更多功能

《Android開發藝術探索》學習筆記之Android的執行執行

一、概述 1、主執行緒與子執行緒 主執行緒 又叫UI執行緒 主要作用是執行四大元件以及處理它們和使用者的互動,主要用來處理和介面相關的事情 子執行緒 執行耗時任務,比如網路請求、I/O操作等

Okhttp——佇列執行

這是很簡單的,但是假如面試官問你,你知道咋回答不? 對於同步: 開始的時候放到同步佇列,結束的時候從同步佇列中取出; 對於非同步: 開始的時候判斷非同步佇列元素數是否大於64?再遍歷一遍,看看和這個AsyncCall相同的host有沒有超過5個? 如果沒有,扔進非同

《Android 開發藝術探索》 第11章 --- android 執行執行

如果程序中沒有四大元件,其優先順序將會降低,intentservice 是service封裝了handerthread ,這是intentservice的優點 執行緒是作業系統的最小排程單元,是系統的一種受限制的系統資源,建立和銷燬執行緒都將有對應的開銷,所以使用執行緒池來避免這種開銷 Andr

高效能伺服器程式設計——程序執行

前言   一般來說,伺服器的硬體資源相對充裕,很多時候都用空間換時間的方法來提高伺服器的效能,不惜浪費大量的空間資源來換取伺服器的執行效率。 池的概念   具體做法:提前申請大量的資源,以備不時之需以及重複使用。這就是池的概念  

Java 使用new Thread執行的區別

本文轉至:https://www.cnblogs.com/cnmenglang/p/6273761.html , 孟凡柱的專欄 的部落格,在此謝謝博主! 1.new Thread的弊端執行一個非同步任務你還只是如下new Thread嗎 new Thread(new Runnable() {