1. 程式人生 > >7.執行緒池中的阻塞佇列無限大是否合適

7.執行緒池中的阻塞佇列無限大是否合適

      在設定執行緒池佇列長度時,如果長度設定的不合理就無法發揮出多執行緒的威力。設定執行緒池的佇列長度取決於使用場景;比如全程非同步的系統,佇列可以設定為0,corePoolSize設定為cpu核數。研究tomcat、Dubbo等業界成熟的產品是如何設定執行緒佇列,分析如何合理設定執行緒池佇列長度。

1.JDK執行緒池策略

      先增加執行緒至corePoolSize,之後放入佇列,如果佇列滿則增加執行緒至maximumPoolSize。其實我們不難發現如果佇列長度設定無限長度,那麼執行緒池個數將只會增加到corePoolSize,如果corePoolSize個數設定又過小,這樣就會無法發揮出多執行緒的威力。

2.Tomcat執行緒池策略

Tomcat的執行緒池佇列是無限長度的,但是執行緒池會一直建立到maximumPoolSize,然後才把請求放入等待佇列中

tomcat 任務佇列org.apache.tomcat.util.threads.TaskQueue其繼承與LinkedBlockingQueue,覆寫offer方法。 

@Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
         //執行緒個數小於MaximumPoolSize會建立新的執行緒。
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

3.Dubbo執行緒池策略

        Dubbo 提供3種執行緒池模型即:FixedThreadPool、CachedThreadPool(客戶端預設的)、LimitedThreadPool(服務端預設的),從原始碼可以看出,其預設的佇列長度都是0,當佇列長度為0 ,其使用是無緩衝的佇列SynchronousQueue,當執行執行緒超過maximumPoolSize則拒絕請求。 

/**
     * 此執行緒池啟動時即建立固定大小的執行緒數,不做任何伸縮,來源於:<code>Executors.newFixedThreadPool()</code>
     *
     * @author william.liangf
     * @see java.util.concurrent.Executors#newFixedThreadPool(int)
     */
    public class FixedThreadPool implements ThreadPool {

        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            //預設佇列長度為0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }

    }

    /**
     * 此執行緒池一直增長,直到上限,增長後不收縮。
     *
     * @author <a href="mailto:[email protected]">kimi</a>
     */
    public class LimitedThreadPool implements ThreadPool {

        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
                 //預設佇列長度為0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }

    }

    /**
     * 此執行緒池可伸縮,執行緒空閒一分鐘後回收,新請求重新建立執行緒,來源於:<code>Executors.newCachedThreadPool()</code>
     *
     * @author william.liangf
     * @see java.util.concurrent.Executors#newCachedThreadPool()
     */
    public class CachedThreadPool implements ThreadPool {

        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
                 //預設佇列長度為0
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }

    }

總結

       執行緒池的任務佇列本來起緩衝作用,但是如果設定的不合理會導致執行緒池無法擴容至max,這樣無法發揮多執行緒的能力,導致一些服務響應變慢。

       佇列長度要看具體使用場景,取決服務端處理能力以及客戶端能容忍的超時時間等

       建議採用tomcat的處理方式,core與max一致,先擴容到max再放佇列,不過佇列長度要根據使用場景設定一個上限值,如果響應時間要求較高的系統可以設定為0。

相關推薦

7.執行阻塞佇列無限大是否合適

      在設定執行緒池佇列長度時,如果長度設定的不合理就無法發揮出多執行緒的威力。設定執行緒池的佇列長度取決於使用場景;比如全程非同步的系統,佇列可以設定為0,corePoolSize設定為cpu核數。研究tomcat、Dubbo等業界成熟的產品是如何設定執行緒佇列,分析

執行阻塞佇列

1. ArrayBlockingQueue 存 add: 拋異常 if (offer(e)) return true; else throw new IllegalStateException("Queue full"); put : 阻塞 whil

跟我學Java多執行——執行阻塞佇列

前言     上一篇文章中我們將ThreadPoolExecutor進行了深入的學習和介紹,實際上我們在專案中應用的時候很少有直接應用ThreadPoolExecutor來建立執行緒池的,在jdk的api中有這麼一句話“但是,強烈建議程式設計師使用較為方便的 Execu

執行阻塞佇列

ThreadPoolExecutor介紹 ThreadPoolExecutor的完整構造方法的簽名如下 ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, Time

原始碼剖析ThreadPoolExecutor執行阻塞佇列

  本文章對ThreadPoolExecutor執行緒池的底層原始碼進行分析,執行緒池如何起到了執行緒複用、又是如何進行維護我們的執行緒任務的呢?我們直接進入正題:   首先我們看一下ThreadPoolExecutor類的原始碼 1 public ThreadPoolExecutor(int coreP

執行常用的阻塞佇列簡述

一、ArrayBlockingQueue 基於陣列的阻塞佇列,有界佇列,按照先進先出(FIFO)的形式,初始化是必須指定capacity.看一下原始碼: /**第一種構造方法,指定初始容量*/ public ArrayBlockingQueue(int capacity) {

關於執行阻塞佇列BlockingQueue

       接上篇文章https://blog.csdn.net/GoSaint/article/details/84345210        對於BlockingQueue阻塞佇列而言,常用在多執行緒生產者

執行執行設定超時退出監控

前言 在寫多執行緒程式時,大多數情況下會先excutor建立執行緒池,然後再建立執行緒,但是對一些讀資料庫或者其他IO操作,容易堵住執行緒,此時就需要給執行緒設定超時時間,幹掉超時的執行緒再重新拉起一個執行緒來,但是java執行緒建立並沒有預留超時引數,研究了一下網上也沒找到

執行原理--任務佇列BlockingQueue

文章目錄 執行緒池原理--任務佇列BlockingQueue 類繼承體系 介面抽象方法 實現類 ArrayBlockingQueue SynchronousQueue LinkedBlockin

Java併發程式設計:4種執行和緩衝佇列BlockingQueue

一. 執行緒池簡介 1. 執行緒池的概念:           執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一

簡單的執行技術,詮釋佇列的架構邏輯

    大家是不是對執行緒池的技術還是不夠明白啦?相信看完後大家會有不一樣的體驗!!!      對於服務端的程式,進場面對的是客戶端的請求,那麼如果客戶端傳來的內容比較單一,內容比較小,需要服務端快速的處理並返回結果;如果把每

java執行以程式碼的順序執行,主要是記錄一下繼承執行的內容

1.這個是自定義的執行緒池類,直接上程式碼 package org.jimmy.threadtest20181121; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecut

定製在排程執行執行的任務

Java 9併發程式設計指南 目錄 定製在排程執行緒池中執行的任務 準備工作 實現過程 工作原理 擴充套件學習 更多關注 排程執行緒池是Executor框架基本執行緒池的擴充套件,排程在一段時間後執行任務。ScheduledTh

執行使用ThreadLocal方案

人工手打,翻譯自:https://moelholm.com/2017/07/24/spring-4-3-using-a-taskdecorator-to-copy-mdc-data-to-async-threads 本來想自己寫一篇關於執行緒池threadlocal的,偶然看到這篇文章覺得挺好的,便直接翻譯了

OKHttp 3.10原始碼解析(一):執行和任務佇列

OKhttp是Android端最火熱的網路請求框架之一,它以高效的優點贏得了廣大開發者的喜愛,下面是OKhttp的主要特點: 1.支援HTTPS/HTTP2/WebSocket 2.內部維護執行緒池佇列,提高併發訪問的效率 3.內部維護連線池,支援多路複用,減少連線建立開銷 4.

Java執行submit()和execute之間的區別?

一: submit()方法,可以提供Future < T > 型別的返回值。 executor()方法,無返回值。 execute無返回值 public void execute(Runnable command) { if (command == null)

執行的柵欄

多執行緒中有三個類,分別是CountDownLatch,CyclicBarrier,Semaphore。代表著執行緒中的柵欄。共享鎖。 CountDownLatch 在一組執行緒中,一個執行緒等待其他執行緒。我把它理解為門栓。 檢視該類的資料結構圖如下圖一 ​ 圖一 有一個靜態的內部類,Sync繼承自A

Java執行的核心執行是如何被重複利用的

在Java開發中,經常需要建立執行緒去執行一些任務,實現起來也非常方便,但如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。此時,我們很自然會想到使用執行緒池來解決這個問題。 使用執行緒池的好處

C# 執行取消執行的三種方式

三種方式都使用CancellationToken,只是使用方式不同,有類似於採用全域性標誌位的方式 第一種 檢測IsCancellationRequested方式 static void AsyncOperation1(CancellationToken t

如何等待java執行所有任務完成

一、等待執行緒池所有執行緒完成: 有時候我們需要等待java thread pool中所有任務完成後再做某些操作,如想要等待所有任務完成,僅需呼叫threadPool.awaitTermination