1. 程式人生 > >At the speed of ideas

At the speed of ideas

什麼時候需要建立執行緒池呢?簡單的說,如果一個應用需要頻繁的建立和銷燬執行緒,而任務執行的時間又非常短,這樣執行緒建立和銷燬的帶來的開銷就不容忽視,這時也是執行緒池該出場的機會了。如果執行緒建立和銷燬時間相比任務執行時間可以忽略不計,則沒有必要使用執行緒池了。

   下面是Linux系統下用C語言建立的一個執行緒池。執行緒池會維護一個任務連結串列(每個CThread_worker結構就是一個任務)。

   pool_init()函式預先建立好max_thread_num個執行緒,每個執行緒執thread_routine ()函式。該函式中

  1. while (pool->cur_queue_size == 0)
  2. {
  3.        pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
  4. }

表示如果任務連結串列中沒有任務,則該執行緒出於阻塞等待狀態。否則從佇列中取出任務並執行。
   
   pool_add_worker()函式向執行緒池的任務連結串列中加入一個任務,加入後通過呼叫pthread_cond_signal (&(pool->queue_ready))喚醒一個出於阻塞狀態的執行緒(如果有的話)。
   
   pool_destroy ()函式用於銷燬執行緒池,執行緒池任務連結串列中的任務不會再被執行,但是正在執行的執行緒會一直把任務執行完後再退出。

下面貼出完整程式碼
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <unistd.h>
  4. #include <sys/types.h>
  5. #include <pthread.h>
  6. #include <assert.h>
  7. /*
  8. *執行緒池裡所有執行和等待的任務都是一個CThread_worker
  9. *由於所有任務都在連結串列裡,所以是一個連結串列結構
  10. */
  11. typedef struct worker
  12. {
  13.     /*回撥函式,任務執行時會呼叫此函式,注意也可宣告成其它形式*/
  14.     void *(*process) (void *arg);
  15.     void *arg;/*回撥函式的引數*/
  16.     struct worker *next;
  17. } CThread_worker;
  18. /*執行緒池結構*/
  19. typedef struct
  20. {
  21.      pthread_mutex_t queue_lock;
  22.      pthread_cond_t queue_ready;
  23.     /*連結串列結構,執行緒池中所有等待任務*/
  24.      CThread_worker *queue_head;
  25.     /*是否銷燬執行緒池*/
  26.     int shutdown;
  27.      pthread_t *threadid;
  28.     /*執行緒池中允許的活動執行緒數目*/
  29.     int max_thread_num;
  30.     /*當前等待佇列的任務數目*/
  31.     int cur_queue_size;
  32. } CThread_pool;
  33. int pool_add_worker (void *(*process) (void *arg), void *arg);
  34. void *thread_routine (void *arg);
  35. static CThread_pool *pool = NULL;
  36. void
  37. pool_init (int max_thread_num)
  38. {
  39.      pool = (CThread_pool *) malloc (sizeof (CThread_pool));
  40.      pthread_mutex_init (&(pool->queue_lock), NULL);
  41.      pthread_cond_init (&(pool->queue_ready), NULL);
  42.      pool->queue_head = NULL;
  43.      pool->max_thread_num = max_thread_num;
  44.      pool->cur_queue_size = 0;
  45.      pool->shutdown = 0;
  46.      pool->threadid =
  47.          (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
  48.     int i = 0;
  49.     for (i = 0; i < max_thread_num; i++)
  50.      {
  51.          pthread_create (&(pool->threadid[i]), NULL, thread_routine,
  52.                  NULL);
  53.      }
  54. }
  55. /*向執行緒池中加入任務*/
  56. int
  57. pool_add_worker (void *(*process) (void *arg), void *arg)
  58. {
  59.     /*構造一個新任務*/
  60.      CThread_worker *newworker =
  61.          (CThread_worker *) malloc (sizeof (CThread_worker));
  62.      newworker->process = process;
  63.      newworker->arg = arg;
  64.      newworker->next = NULL;/*別忘置空*/
  65.      pthread_mutex_lock (&(pool->queue_lock));
  66.     /*將任務加入到等待佇列中*/
  67.      CThread_worker *member = pool->queue_head;
  68.     if (member != NULL)
  69.      {
  70.         while (member->next != NULL)
  71.              member = member->next;
  72.          member->next = newworker;
  73.      }
  74.     else
  75.      {
  76.          pool->queue_head = newworker;
  77.      }
  78.      assert (pool->queue_head != NULL);
  79.      pool->cur_queue_size++;
  80.      pthread_mutex_unlock (&(pool->queue_lock));
  81.     /*好了,等待佇列中有任務了,喚醒一個等待執行緒;
  82.      注意如果所有執行緒都在忙碌,這句沒有任何作用*/
  83.      pthread_cond_signal (&(pool->queue_ready));
  84.     return 0;
  85. }
  86. /*銷燬執行緒池,等待佇列中的任務不會再被執行,但是正在執行的執行緒會一直
  87. 把任務執行完後再退出*/
  88. int
  89. pool_destroy ()
  90. {
  91.     if (pool->shutdown)
  92.         return -1;/*防止兩次呼叫*/
  93.      pool->shutdown = 1;
  94.     /*喚醒所有等待執行緒,執行緒池要銷燬了*/
  95.      pthread_cond_broadcast (&(pool->queue_ready));
  96.     /*阻塞等待執行緒退出,否則就成殭屍了*/
  97.     int i;
  98.     for (i = 0; i < pool->max_thread_num; i++)
  99.          pthread_join (pool->threadid[i], NULL);
  100.      free (pool->threadid);
  101.     /*銷燬等待佇列*/
  102.      CThread_worker *head = NULL;
  103.     while (pool->queue_head != NULL)
  104.      {
  105.          head = pool->queue_head;
  106.          pool->queue_head = pool->queue_head->next;
  107.          free (head);
  108.      }
  109.     /*條件變數和互斥量也別忘了銷燬*/
  110.      pthread_mutex_destroy(&(pool->queue_lock));
  111.      pthread_cond_destroy(&(pool->queue_ready));
  112.      free (pool);
  113.     /*銷燬後指標置空是個好習慣*/
  114.      pool=NULL;
  115.     return 0;
  116. }
  117. void *
  118. thread_routine (void *arg)
  119. {
  120.      printf ("starting thread 0x%x\n", pthread_self ());
  121.     while (1)
  122.      {
  123.          pthread_mutex_lock (&(pool->queue_lock));
  124.         /*如果等待佇列為0並且不銷燬執行緒池,則處於阻塞狀態; 注意
  125.          pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖*/
  126.         while (pool->cur_queue_size == 0 && !pool->shutdown)
  127.          {
  128.              printf ("thread 0x%x is waiting\n", pthread_self ());
  129.              pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
  130.          }
  131.         /*執行緒池要銷燬了*/
  132.         if (pool->shutdown)
  133.          {
  134.             /*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/
  135.              pthread_mutex_unlock (&(pool->queue_lock));
  136.              printf ("thread 0x%x will exit\n", pthread_self ());
  137.              pthread_exit (NULL);
  138.          }
  139.          printf ("thread 0x%x is starting to work\n", pthread_self ());
  140.         /*assert是除錯的好幫手*/
  141.          assert (pool->cur_queue_size != 0);
  142.          assert (pool->queue_head != NULL);
  143.         /*等待佇列長度減去1,並取出連結串列中的頭元素*/
  144.          pool->cur_queue_size--;
  145.          CThread_worker *worker = pool->queue_head;
  146.          pool->queue_head = worker->next;
  147.          pthread_mutex_unlock (&(pool->queue_lock));
  148.         /*呼叫回撥函式,執行任務*/
  149.          (*(worker->process)) (worker->arg);
  150.          free (worker);
  151.          worker = NULL;
  152.      }
  153.     /*這一句應該是不可達的*/
  154.      pthread_exit (NULL);
  155. }
    下面是測試程式碼
  1. void *
  2. myprocess (void *arg)
  3. {
  4.      printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
  5.      sleep (1);/*休息一秒,延長任務的執行時間*/
  6.     return NULL;
  7. }
  8. int
  9. main (int argc, char **argv)
  10. {
  11.      pool_init (3);/*執行緒池中最多三個活動執行緒*/
  12.     /*連續向池中投入10個任務*/
  13.     int *workingnum = (int *) malloc (sizeof (int) * 10);
  14.     int i;
  15.     for (i = 0; i < 10; i++)
  16.      {
  17.          workingnum[i] = i;
  18.          pool_add_worker (myprocess, &workingnum[i]);
  19.      }
  20.     /*等待所有任務完成*/
  21.      sleep (5);
  22.     /*銷燬執行緒池*/
  23.      pool_destroy ();
  24.      free (workingnum);
  25.     return 0;
  26. }
將上述所有程式碼放入threadpool.c檔案中,
在Linux輸入編譯命令
$ gcc -o threadpool threadpool.c -lpthread

以下是執行結果
starting thread 0xb7df6b90
thread 0xb7df6b90 is waiting
starting thread 0xb75f5b90
thread 0xb75f5b90 is waiting
starting thread 0xb6df4b90
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 0
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 1
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 2
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 3
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 4
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 5
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 6
thread 0xb75f5b90 is starting to work
threadid is 0xb75f5b90, working on task 7
thread 0xb6df4b90 is starting to work
threadid is 0xb6df4b90, working on task 8
thread 0xb7df6b90 is starting to work
threadid is 0xb7df6b90, working on task 9
thread 0xb75f5b90 is waiting
thread 0xb6df4b90 is waiting
thread 0xb7df6b90 is waiting
thread 0xb75f5b90 will exit
thread 0xb6df4b90 will exit
thread 0xb7df6b90 will exit