【FlinkCEP】一、基本瞭解
如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。
為解決此問題,我們需要使用執行緒池。
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,下面主要來圍繞這個類對執行緒池進行介紹。
執行緒池的建立
7個核心引數+4種丟棄策略
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize: 表示常駐核心執行緒數。如果等於0,則執行完任務之後,沒有任何請求進入時銷燬執行緒池的執行緒;如果大於0,即使本地任務執行完,核心執行緒也不會被銷燬。這個值非常關鍵,設定過大會浪費資源,過小會導致執行緒頻繁的建立或銷燬。
maximumPoolSize:表示執行緒池能夠同時執行的最大執行緒數,這個值必須大於1,如果執行的執行緒數大於此值,則需要藉助第5個引數的幫助,快取在佇列中。如果maximumPoolSize與corePoolSize相等,即是固定大小的執行緒池。
keepAliveTime:表示執行緒池中的執行緒空閒時間,當空閒時間達到keepAliveTime值時,執行緒會被銷燬,直至剩下corePoolSize個執行緒為止,避免浪費記憶體和控制代碼資源。在預設情況下,當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用。但是當ThreadPoolExecutor的allowCoreThreadTimeOut變數值設為true時,核心執行緒超時後也會被收回。
unit:表示時間單位。keepAliveTime 的時間單位通常時TimeUnit.SECONDS.
workQueue:表示快取佇列。當請求的執行緒數大於maximumPoolSize時,執行緒進入BlockingQueue阻塞佇列。
threadFactory:表示執行緒工廠。他用來生產一組相同任務的執行緒。執行緒池的命名是通過給這個factory增加組名字首名來實現的。在虛擬機器棧分析時就可以知道執行緒任務是有哪個執行緒工廠來產生的。
handler:表示執行拒絕策略的物件。當超過第5個引數workQueue的任務快取區上限時,就可以通過該策略處理物件,這是一種簡單的限流保護。
拒絕策略主要包含:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程) ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
由這些引數可知,執行緒池的工作過程如下:
1. 執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面 有任務,執行緒池也不會馬上執行它們。
2. 當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
a) 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
b) 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列;
c) 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要 建立非核心執行緒立刻執行這個任務;
d) 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池 會丟擲異常 RejectExecutionException。
3. 當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
4. 當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,
如果當前運 行的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它 最終會收縮到 corePoolSize 的大小。