RabbitMQ客戶端原始碼分析(五)之ConsumerWorkSerivce與WorkPool
阿新 • • 發佈:2018-12-16
RabbitMQ-java-client版本
com.rabbitmq:amqp-client:4.3.0
RabbitMQ
版本宣告: 3.6.15
WorkPool
-
WorkPool可以認為是一個任務池,儲存
client
(在這裡實際型別其實就是Channel
)與具體處理任務的關係 -
成員變數,
SetQueue
和VariableLinkedBlockingQueue
在下面詳細說明//預設最大佇列長度 private static final int MAX_QUEUE_LENGTH = 1000; /**就緒的clients集合,SetQueue本質是一個LinkedList+Set*/
-
registerKey(K key)
: Channel所對應的任務佇列長度取決於是否限制佇列長度,如果限制佇列長度最大MAX_QUEUE_LENGTH(1000)
,不限制就是Integer.MAX_VALUE
public void registerKey(K key) { synchronized (this) { if
-
nextWorkBlock(Collection<W> to, int size)
:返回下一個準備就緒的Channel,並從該Channel對應的任務佇列裡取出size個任務放在傳入的引數Collection中。public K nextWorkBlock(Collection<W> to, int size) { synchronized (this) { //從就緒佇列中取出一個Channel K nextKey = readyToInProgress(); if (nextKey != null) { //獲取Channel對應的任務佇列 VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey); //連續從queue取中size個元素,並將元素儲存到to集合中 drainTo(queue, to, size); } return nextKey; } } private K readyToInProgress() { //從SetQueue的佇列頭獲取一個Channel,並從就緒集合轉移到處理集合 K key = this.ready.poll(); if (key != null) { this.inProgress.add(key); } return key; } private int drainTo(VariableLinkedBlockingQueue<W> deList, Collection<W> c, int maxElements) { int n = 0; while (n < maxElements) { W first = deList.poll(); if (first == null) break; c.add(first); ++n; } return n; }
-
addWorkItem(K key, W item)
: 為特定的Channel新增新的任務,如果Channel處於休眠狀態(不是就緒狀態,不是處理狀態,不是註冊狀態),就將Channel標記為準備就緒狀態public boolean addWorkItem(K key, W item) { VariableLinkedBlockingQueue<W> queue; synchronized (this) { queue = this.pool.get(key); } // The put operation may block. We need to make sure we are not holding the lock while that happens. if (queue != null) { try { queue.put(item); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } synchronized (this) { //如果Channel處於休眠狀態,轉換為準備就緒狀態 if (isDormant(key)) { dormantToReady(key); return true; } } } return false; } private boolean isInProgress(K key){ return this.inProgress.contains(key); } private boolean isReady(K key){ return this.ready.contains(key); } private boolean isRegistered(K key) { return this.pool.containsKey(key); } /** * 是否休眠狀態 * @param key * @return */ private boolean isDormant(K key){ return !isInProgress(key) && !isReady(key) && isRegistered(key); } //狀態流轉方法,全部假設key已經註冊過 private void inProgressToReady(K key){ this.inProgress.remove(key); this.ready.addIfNotPresent(key); } private void inProgressToDormant(K key){ this.inProgress.remove(key); } private void dormantToReady(K key){ this.ready.addIfNotPresent(key); }
-
finishWorkBlock(K key)
: 設定客戶端不是處理狀態(inProgress
)。忽略未知客戶端(並返回false)。public boolean finishWorkBlock(K key) { synchronized (this) { if (!this.isRegistered(key)) return false; if (!this.inProgress.contains(key)) { throw new IllegalStateException("Client " + key + " not in progress"); } if (moreWorkItems(key)) { inProgressToReady(key); return true; } else { inProgressToDormant(key); return false; } } } private boolean moreWorkItems(K key) { VariableLinkedBlockingQueue<W> leList = this.pool.get(key); return leList != null && !leList.isEmpty(); }
VariableLinkedBlockingQueue
- 這是一個
LinkedBlockingQueue
類的克隆,增加了一個setCapacity(int)
方法,允許在使用的過程中更改容量
SetQueue
-
SetQueue是一個Set佇列,即佇列中的元素只能出現一次,本質上佇列還是通過
LinkedList
實現,只不過同時通過HashSet判斷是否已經有元素。如果有則不再新增元素到佇列中。 -
addIfNotPresent(T item)
: 如果不存在就新增到佇列尾部public boolean addIfNotPresent(T item) { if (this.members.contains(item)) { return false; } this.members.add(item); //新增到linkedList佇列的尾部 this.queue.offer(item); return true; }
-
poll()
:獲取並移除佇列頭部元素public T poll() { T item = this.queue.poll(); if (item != null) { this.members.remove(item); } return item; }
ConsumerWorkService
-
ConsumerWorkService主要是用於處理Channel的任務,大部分方法都是委託
WorkPool
來進行處理 -
成員變數與建構函式
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16; //預設執行緒數 對於IO密集的程式來說2倍是不是有點少?? private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2; private final ExecutorService executor; private final boolean privateExecutor; //Channel與具體任務的關係 private final WorkPool<Channel, Runnable> workPool; private final int shutdownTimeout; public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) { //如果沒有自定義執行緒就使用預設的執行緒池 this.privateExecutor = (executor == null); this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor; this.workPool = new WorkPool<Channel, Runnable>(); this.shutdownTimeout = shutdownTimeout; }
-
addWork
:給特定的Channel新增任務public void addWork(Channel channel, Runnable runnable) { if (this.workPool.addWorkItem(channel, runnable)) { this.executor.execute(new WorkPoolRunnable()); } } private final class WorkPoolRunnable implements Runnable { @Override public void run() { //一個執行緒每次執行16個任務 int size = MAX_RUNNABLE_BLOCK_SIZE; List<Runnable> block = new ArrayList<Runnable>(size); try { Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); if (key == null) return; // nothing ready to run try { for (Runnable runnable : block) { runnable.run(); } } finally { //任務執行完成後清理,如果還有任務在佇列裡,則繼續使用執行緒池執行(遞迴) if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); } } } catch (RuntimeException e) { Thread.currentThread().interrupt(); } } }
ConsumerDispatcher
- Consumer分發器,每個
Channel
都有一個分發器,分發通知事件傳送給內部管理的WorkPool
和ConsumerWorkService