java多執行緒任務佇列模型
此篇文章將從任務佇列的設計;任務排程的方式(序列和並行)。程式碼很簡單,主要是設計的思想。
任務佇列 final class PendingPostQueue { // 含有頭、尾指標的連結串列結構實現佇列 private PendingPost head; private PendingPost tail;
// 入佇列 synchronized void enqueue(PendingPost pendingPost) { if (pendingPost == null) { throw new NullPointerException("null cannot be enqueued"); } if (tail != null) { tail.next = pendingPost; tail = pendingPost; } else if (head == null) { head = tail = pendingPost; } else { throw new IllegalStateException("Head present, but no tail"); } notifyAll(); }
// 出佇列 synchronized PendingPost poll() { PendingPost pendingPost = head; if (head != null) { head = head.next; if (head == null) { tail = null; } } return pendingPost; }
// 等待最大時長; 如果此時有入佇列的操作(notifyAll),直接出佇列 synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException { if (head == null) { wait(maxMillisToWait); } return poll(); } } 上面的程式碼很簡單,基本上一看就能明白;下面主要分析,這樣設計的優點:
使用頭、尾指標的連結串列結構實現佇列;入佇列通過操作尾指標,出佇列通過操作頭指標的方式達到時間複雜度都是O(1). 增加出佇列延遲的功能,方式在空佇列的時候,持續獲取或直接返回空;增加一段時間間隔等待其他執行緒的入佇列的操作(儘可能處理儘量多的任務。) 任務排程:序列執行 序列的任務排程,基本上是單執行緒模型。因為基本上是下一個任務的執行需要等到上一個任務執行完成。 程式碼如下:
// 當前任務排程類(序列) final class BackgroundPoster implements Runnable { // 任務佇列 private final PendingPostQueue queue; // 當前執行緒是否在正在執行 // volatile: 保證單個變數的讀寫操作是執行緒安全(通過cpu實現CAS) private volatile boolean executorRunning;
BackgroundPoster() { queue = new PendingPostQueue(); }
public void enqueue(String id, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(id, event); // 建立任務 synchronized (this) { queue.enqueue(pendingPost); // 入佇列 // 如果當前沒有正在執行的任務,開啟任務 if (!executorRunning) { executorRunning = true; ThreadUtils.getExecutorService().execute(this); } } }
@Override public void run() { try { try { while (true) { // 從任務佇列中獲取任務;設定一分鐘時間間隔,防止在1000分鐘內有新任務入佇列 PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // 雙層檢驗 pendingPost = queue.poll(); if (pendingPost == null) { // 執行標誌置為false executorRunning = false; return; // 如果沒有任務了,將會結束此次迴圈,也就相當於停止了當前執行緒(也正因為此,上面的wait(1000)才很重要) } } } // 執行任務 invokePost(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
} 上面的程式碼也不難,對照我寫的註釋看起來會很簡單。原理也很簡單:
任務的執行是在一個子執行緒(通過執行緒池開啟的)中 任務的排程是通過操作任務佇列實現的,通過迴圈依次呼叫佇列中的任務。 wait(1000)的作用,最大化使用執行緒資源;防止佇列中剛沒有任務了就停止執行緒(具體分析在註釋中) 任務排程:並行執行 並行排程任務,就需要多執行緒排程了。 具體程式碼實現如下:
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
AsyncPoster() { queue = new PendingPostQueue(); }
public void enqueue(String id, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(id, event); queue.enqueue(pendingPost); ThreadUtils.getExecutorService().execute(this); }
@Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } invokePost(pendingPost); } } 上面的程式碼更簡單,就是每一個任務開啟一個執行緒去執行。 但是如果仔細檢視程式碼會發現: 這裡根本就沒有必要使用任務佇列,直接開啟執行緒去執行任務不就行了嗎?這裡任務佇列的作用是用來傳遞資料。
任務排程:Android主執行緒排程 我們經常會遇到:回撥在主執行緒中執行。由於主執行緒只有一個,也就相當於上面的序列執行。而Android有自己的Handler訊息機制幫我們封裝好了,下面就基於這個來實現。
final class HandlerPoster extends Handler {
private final PendingPostQueue queue; // 主執行緒執行最大時長(防止阻塞主執行緒) private final int maxMillisInsideHandleMessage; // 正在執行的標誌(同序列執行) private boolean handlerActive; // 引數looper決定了當前任務所執行的執行緒,這裡傳遞Looper.mainLooper()就會將當前任務執行在主執行緒中 HandlerPoster(Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); }
void enqueue(String id, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(id, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; // 傳送訊息 if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } invokePost(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; // 如果在主執行緒中執行的時間超過最大時間,停止當前操作,重新發送訊息;防止祖冊主執行緒 if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } //重置執行,也就是還處於執行狀態。 rescheduled = true; return; } } } finally { // 執行狀態由rescheduled決定 handlerActive = rescheduled; } } } 程式碼也不難,原理基本和序列排程相同;唯一不同,因為是在主執行緒中,需要對執行緒阻塞的問題進行考慮。 --------------------- 作者:qiaoba_gogo 來源:CSDN 原文:https://blog.csdn.net/u010014658/article/details/77925567