多執行緒高併發程式設計(12) -- 阻塞演算法實現ArrayBlockingQueue原始碼分析
阿新 • • 發佈:2020-06-25
一.前言
前文探究了非阻塞演算法的實現ConcurrentLinkedQueue安全佇列,也說明了阻塞演算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlockingQueue。
ArrayBlockingQueue是一個阻塞佇列,底層使用陣列結構實現,按照先進先出(FIFO)的原則對元素進行排序。
ArrayBlockingQueue是一個執行緒安全的集合,通過ReentrantLock鎖來實現,在併發情況下可以保證資料的一致性。
此外,ArrayBlockingQueue的容量是有限的,陣列的大小在初始化時就固定了,不會隨著佇列元素的增加而出現擴容的情況,也就是說ArrayBlockingQueue是一個“有界快取區”。
從下圖可以看出,ArrayBlockingQueue是使用一個數組儲存元素的,當向佇列插入元素時,首先會插入到陣列下標索引為6的位置,再有新元素進來時插入到索引為7的位置,依次類推,如果滿了就不會再插入。
當元素出隊時,先移除索引為2的元素3,與入隊一樣,依次類推,移除索引3、4、5...上的元素。這也形成了“先進先出”。
二.原始碼解析
-
構造方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //佇列實現:陣列 final Object[] items; //當讀取元素時陣列的下標(下一個被取出元素的索引) int takeIndex; //新增元素時陣列的下標 (下一個被新增元素的索引) int putIndex; //佇列中元素個數: int count; //可重入鎖: final ReentrantLock lock; //入隊操作時是否讓執行緒等待 private final Condition notEmpty; //出隊操作時是否讓執行緒等待 private final Condition notFull; /** * 初始化佇列容量構造:由於公平鎖會降低佇列的效能,因而使用非公平鎖(預設)。 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } //帶初始容量大小和公平鎖佇列(公平鎖通過ReentrantLock實現): public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } }
-
在多執行緒中,預設不保證執行緒公平的訪問佇列;
-
在ArrayBlockingQueue中為了保證資料的安全,使用了ReentrantLock鎖。由於鎖的引入,導致了執行緒之間的競爭。當有一個執行緒獲取到鎖時,其餘執行緒處於等待狀態。當鎖被釋放時,所有等待執行緒為奪鎖而競爭;
-
鎖有公平鎖和非公平鎖:
-
公平鎖:等待的執行緒在獲取鎖而競爭時,按照等待的先後順序FIFO進行獲取操作;公平鎖可以應用在比如併發下的日誌輸出佇列中,保證了日誌輸出的順序完整性;
-
優點:等待鎖的執行緒不會餓死,和非公平鎖相比,在獲得鎖和保證鎖分配的均衡性差異較小;
-
缺點:使用公平鎖的程式在多執行緒訪問時表現為很低的吞吐量(即速度很慢),等待佇列中除第一個執行緒以外的所有執行緒都會阻塞,CPU喚醒阻塞執行緒的開銷比非公平鎖的大;公平鎖不能保證執行緒排程的公平性,因此,使用公平鎖的眾多執行緒中的一員可能獲得多倍的成功機會,這種情況發生在其他活動執行緒沒有被處理並且目前並未持有鎖時【ReentrantLock原始碼對公平鎖的定義】;
Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock.
-
上面這句話有重入鎖的概念,一個執行緒可以在已經獲取鎖的情況下再次進入獲取到鎖,不需要競爭;同時,如果一個執行緒獲取到了鎖,然後釋放,在其他執行緒來獲取之前再次是可以獲取到鎖的。
A: Request Lock -> Release Lock -> Request Lock Again (Succeeds) B: Request Lock (Denied)... ----------------------- Time --------------------------------->
-
-
-
非公平鎖:在獲取鎖時,無論是先等待還是後等待的執行緒,均有可能獲取到鎖。即根據搶佔機制,是隨機獲取鎖的,和公平鎖不一樣的是先來的不一定能獲取到鎖,有可能一直拿不到鎖,這樣會造成“飢餓”現象;
-
優點:非公平鎖效能高於公平鎖效能。首先,在恢復一個被掛起的執行緒與該執行緒真正執行之間存在著嚴重的延遲,而且,非公平鎖更能充分的利用CPU的時間片,儘量減少CPU空閒的狀態時間;即可以減少喚起執行緒的開銷,整體的吞吐效率高,因為執行緒有機率不阻塞直接獲取到鎖,CPU不必喚醒其他所有執行緒;
-
缺點:處於等待佇列中的執行緒可能會餓死或者等很久才會獲得鎖;
-
-
產生“飢餓”的原因:
-
高優先順序吞噬所有低優先順序的CPU時間片,優先順序越高,就會獲得越高的CPU執行機會; ---> 使用預設的優先順序;
-
執行緒被永久阻塞在一個等待進入同步塊synchronized的狀態(長時間執行) ,同時synchronized並不保障等待執行緒的順序(鎖釋放後,隨機競爭,由OS排程),這會存在一個可能是某個執行緒總是搶鎖搶不到導致一直等待狀態 ---> 避免持有鎖的執行緒長時間執行、使用顯示lock來代替synchronized;
synchronized(obj) { while (true) { // .... infinite loop }
-
等待的執行緒永遠不被喚醒:如果多個執行緒處在wait方法執行上,而對其呼叫notify方法不會保證哪一個執行緒會獲得喚醒,喚醒是無序的,跟VM/OS排程有關,甚至底層是隨機選取一個或是佇列中的第一個,任何執行緒都有可能處於繼續等待的狀態,因此存在這樣一個風險,即一個等待執行緒從來得不到喚醒,因為其他等待執行緒總是能被獲得喚醒 ---> 使用顯示lock來代替synchronized;
-
-
比如ReentrantLock:
-
在公平鎖中,如果有另一個執行緒持有鎖或者有其他執行緒在等待佇列中等待這個鎖,那麼新發出的請求的執行緒將被放入到佇列中;
-
非公平鎖中, 根據搶佔機制,擁有鎖的執行緒在釋放鎖資源的時候, 新發出請求的執行緒可以和等待佇列中的第一個執行緒競爭鎖資源, 新執行緒競爭失敗才放入佇列中,但是已經進入等待佇列的執行緒, 依然是按照先進先出的順序獲取鎖資源;
-
-
-
-
入隊:有阻塞式和非阻塞式
-
阻塞式:當佇列中的元素已滿時,則會將此執行緒停止,讓其處於等待狀態,直到佇列中有空餘位置產生
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//獲取鎖 try { //佇列中元素 == 陣列長度(佇列滿了),則執行緒等待 while (count == items.length) notFull.await(); enqueue(e);//元素加入佇列 } finally { lock.unlock();//釋放鎖 } }
-
lockInterruptibly:
-
如果當前執行緒未被中斷,則獲取鎖。
-
如果該鎖沒有被另一個執行緒保持,則獲取該鎖並立即返回,將鎖的保持計數設定為 1。
-
如果當前執行緒已經保持此鎖,則將保持計數加 1,並且該方法立即返回。
-
如果鎖被另一個執行緒保持,則出於執行緒排程目的,禁用當前執行緒,並且在發生以下兩種情況之一以前,該執行緒將一直處於休眠狀態:1)鎖由當前執行緒獲得;2)其他某個執行緒中斷當前執行緒
-
-
-
非阻塞式:當佇列中的元素已滿時,並不會阻塞此執行緒的操作,而是讓其返回又或者是丟擲異常
public boolean add(E e) { return super.add(e);// AbstractQueue.add } public boolean add(E e) { if (offer(e))//呼叫實現介面 return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { checkNotNull(e);//檢測是否有空指標異常 final ReentrantLock lock = this.lock;//獲得鎖物件 lock.lock();//加鎖 try { //如果佇列滿了,返回false if (count == items.length) return false; else { //元素加入佇列 enqueue(e); return true; } } finally { lock.unlock();//釋放鎖 } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //獲得陣列 final Object[] items = this.items; //槽位填充元素 items[putIndex] = x; //獲得下一個被新增元素的索引,如果值等於陣列長度,表示到達尾部了,需要從頭開始填充 if (++putIndex == items.length) putIndex = 0; count++;//數量+1 notEmpty.signal();//喚醒出隊上的等待執行緒,表示有元素可以消費了 }
-
enqueue中++putIndex == items.length,putIndex=0:這是因為當前佇列執行元素出隊時總是從佇列頭部獲取,而新增元素的索引從佇列尾部獲取所以當佇列索引(從0開始)與陣列長度相等時,下次我們就需要從陣列頭部開始添加了
-
-
阻塞式和非阻塞式的結合:offer(E e, long timeout, TimeUnit unit),向佇列尾部新增元素,可以設定執行緒等待時間,如果超過指定時間佇列還是滿的,則返回false;
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e);//檢測是否為空 long nanos = unit.toNanos(timeout);//轉換成超時時間閥值 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加鎖 try { //佇列是否滿了的判斷 while (count == items.length) { if (nanos <= 0)//等待超時結束返回false return false; nanos = notFull.awaitNanos(nanos);//佇列滿了,等待出隊有空位填充 } enqueue(e);//加入佇列中 return true; } finally { lock.unlock();//釋放鎖 } }
-
-
出隊:同樣有阻塞式和非阻塞式
-
阻塞式:當佇列中的元素已空時,則會將此執行緒停止,讓其處於等待狀態,直到佇列中有元素插入
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //佇列為空,進行等待 while (count == 0) notEmpty.await(); return dequeue();//返回出隊元素 } finally { lock.unlock(); } }
-
非阻塞式:當佇列中的元素已滿時,並不會阻塞此執行緒的操作,而是讓其返回null或元素【裡面的迭代器比較複雜,留待下文探究】
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //佇列為空,返回null,否則返回元素 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items;//獲得佇列 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];//獲得出隊元素 items[takeIndex] = null;//出隊槽位元素置為null //下一個被取出元素的索引+1,如果值等於長度,表示後面沒有元素了,需要從頭開始取出 if (++takeIndex == items.length) takeIndex = 0; count--;//數量-1 if (itrs != null)//迭代器不為空 itrs.elementDequeued();//同時更新迭代器中的元素資料 notFull.signal();//喚醒入隊執行緒 return x;//返回出隊元素 }
-
阻塞式和非阻塞式的結合:poll(long timeout, TimeUnit unit),出隊獲取元素,可以設定執行緒等待時間,如果超過指定時間佇列還是空的,則返回null;
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout);//轉換成超時時間閥值 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加鎖 try { while (count == 0) {//佇列空了,等待 if (nanos <= 0)//超時了返回null return null; nanos = notEmpty.awaitNanos(nanos);//等待入隊填充元素 } return dequeue();//返回出隊元素 } finally { lock.unlock();//釋放鎖 } }
-
-
移除元素remove:
public boolean remove(Object o) { //要移除的元素為空返回false if (o == null) return false; //獲得佇列陣列 final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock();//加鎖 try { //佇列有元素 if (count > 0) { final int putIndex = this.putIndex;//獲得下一個被新增元素的索引 int i = takeIndex;//下一個被取出元素的索引 do { if (o.equals(items[i])) {//從takeIndex下標開始,找到要被刪除的元素 removeAt(i);//移除 return true; } if (++i == items.length)//下一個被取出元素的索引+1並判斷是否等於佇列長度,如果是,表示需要從頭開始遍歷 i = 0; } while (i != putIndex);//繼續查詢,直到找到最後一個元素 } return false; } finally { lock.unlock();//解鎖 } } /** * 根據下標移除元素,那麼會分成兩種情況一個是移除的是隊首元素,一個是移除的是非隊首元素,移除隊首元素,就相當於出隊操作, * 移除非隊首元素那麼中間就有空位了,後面元素需要依次補上,然後如果是隊尾元素,那麼putIndex也就是插入操作的下標也就需要跟著移動。 */ void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items;//獲得佇列 if (removeIndex == takeIndex) {//移除的是隊首元素 // removing front item; just advance items[takeIndex] = null;//隊首置為null if (++takeIndex == items.length)//下一個被取出元素的索引+1並判斷是否等於佇列長度 takeIndex = 0; count--;//數量-1 if (itrs != null)//迭代器不為空 itrs.elementDequeued();//更新迭代器元素 } else {//移除的不是隊首元素,而是中間元素 // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex;//下一個被新增元素的索引 for (int i = removeIndex;;) {//對佇列進行遍歷,因為是佇列中間的值被移除了,所有後面的元素都要挨個遷移 int next = i + 1;//獲取移除元素的下一個座標 if (next == items.length)//判斷是否等於佇列長度 next = 0; if (next != putIndex) {//獲取移除元素的下一個座標!=下一個被新增元素的索引,表示移除元素的索引後面有值 items[i] = items[next];//當前要移除的元素置為後面的元素,即對後面的元素往前遷移,覆蓋要移除的元素 i = next;//下一個遷移的索引 } else {//移除的元素是最後一個,後面沒有值了 items[i] = null;//移除元素,直接置為null this.putIndex = i;//更新下一個被新增元素的索引 break;//結束 } } count--;//數量-1 if (itrs != null)//迭代器不為空 itrs.removedAt(removeIndex);//更新迭代器元素 } notFull.signal();//喚醒入隊執行緒,可以新增元素了 }
-
清空元素clear:用於清空ArrayBlockingQueue,並且會釋放所有等待notFull條件的執行緒(存放元素的執行緒)
public void clear() { final Object[] items = this.items;//獲得佇列 final ReentrantLock lock = this.lock; lock.lock(); try { int k = count;//獲取元素數量 if (k > 0) {//有元素,表示佇列不為空 final int putIndex = this.putIndex;//下一個被新增元素的索引 int i = takeIndex;//下一個被取出元素的索引 do { items[i] = null;//對每個有元素的槽位置為null if (++i == items.length) i = 0; } while (i != putIndex);//從有元素的第一個槽位開始遍歷,直到槽位元素為null takeIndex = putIndex;//更新取出和新增的索引 count = 0;//數量更新為0 if (itrs != null)//迭代器不為空 itrs.queueIsEmpty();//更新迭代器為空 //若有等待notFull條件的執行緒,則逐一喚醒 for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal();//喚醒入隊執行緒,可以新增元素了 } } finally { lock.unlock(); } }
-
offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)裡面有awaitNanos,下面探討該功能實現:對當前執行緒或等待的入/出隊執行緒進行掛起,如果有入/出隊操作進行了喚醒出/入隊操作,則acquireQueued自旋獲取到鎖,然後出/入隊中的ReentrantLock是重入鎖,可以重入獲取到鎖進行出/入隊操作
AbstractQueuedSynchronizer: //進行超時控制 public final long awaitNanos(long nanosTimeout) throws InterruptedException { //如果當前執行緒中斷了丟擲中斷異常 if (Thread.interrupted()) throw new InterruptedException(); //當前執行緒加入到Condition佇列中 Node node = addConditionWaiter(); //鎖釋放是否成功:釋放當前執行緒的lock,從AQS的佇列中移出 int savedState = fullyRelease(node); //到達等待時間點 final long deadline = System.nanoTime() + nanosTimeout; //中斷標識 int interruptMode = 0; //當前節點是否在同步佇列中,否表示不在,進入掛起判斷操作,如果已經在Sync佇列中,則退出迴圈 //那什麼時候會把當前執行緒又加入到Sync佇列中呢?當然是呼叫signal方法的時候,因為這裡需要喚醒之前呼叫await方法的執行緒,喚醒之後進行下面的獲取鎖等操作 while (!isOnSyncQueue(node)) { //如果超時了,將執行緒掛起,然後停止遍歷 if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } //如果等待時間間隔超過了1000,繼續掛起 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); //執行緒中斷了停止遍歷 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; //獲得剩餘的等待時間間隔 nanosTimeout = deadline - System.nanoTime(); } //結束掛起,acquireQueued自旋對當前執行緒的隊列出隊進行獲取鎖並返回執行緒是否中斷 //如果執行緒被中斷,並且中斷的方式不是丟擲異常,則設定中斷後續的處理方式設定為REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;//中斷標識更新為退出等待時重新中斷 if (node.nextWaiter != null)//當前節點後面還有節點,多併發操作了 unlinkCancelledWaiters();//從頭到尾遍歷Condition佇列,移除被cancel的節點 //如果執行緒已經被中斷,則根據之前獲取的interruptMode的值來判斷是繼續中斷還是丟擲異常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime();//返回剩餘等待時間 }
-
drainTo可以一次性獲取佇列中所有的元素,它減少了鎖定佇列的次數,使用得當在某些場景下對效能有不錯的提升
//最多從此佇列中移除給定數量的可用元素,並將這些元素新增到給定collection中 public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c);//檢查是否為空 if (c == this)//如果集合型別相同丟擲引數異常 throw new IllegalArgumentException(); if (maxElements <= 0)//如果給定移除數量小於0,返回0,表示不做移除操作 return 0; final Object[] items = this.items;//獲得佇列 final ReentrantLock lock = this.lock; lock.lock();//加鎖 try { int n = Math.min(maxElements, count);//獲得元素的最小數量 int take = takeIndex;//下一個被取出元素的索引 int i = 0; try { while (i < n) {//遍歷移除和新增 @SuppressWarnings("unchecked") E x = (E) items[take];//獲得移除元素 c.add(x);//元素新增到直到集合中 items[take] = null;//元素原先佇列位置置為null if (++take == items.length)//如果取出索引到達尾部,從頭開始遍歷取出 take = 0; i++;//移除的數量+1,如果達到了移除的最小數量,結束遍歷 } return n;//返回一共移除並添加了多少個元素 } finally { // Restore invariants even if c.add() threw if (i > 0) {//如果有移除操作 count -= i;//佇列元素數量-i takeIndex = take;//重置下一個被取出元素的索引 if (itrs != null) {//迭代器不為空 if (count == 0)//佇列空了 itrs.queueIsEmpty();//迭代器清空 else if (i > take)//說明take中間變成0了,通知itr itrs.takeIndexWrapped(); } //喚醒在因為佇列滿而等待的入隊執行緒,最多喚醒i個,避免執行緒被喚醒了因為佇列又滿了而阻塞 for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }
三.Logback 框架中非同步日誌列印中ArrayBlockingQueue的使用
-
在高併發並且響應時間要求比較小的系統中同步打日誌已經滿足不了需求了,這是因為打日誌本身是需要同步寫磁碟的,會造成 響應時間 增加,如下圖同步日誌列印模型為:
-
非同步模型是業務執行緒把要列印的日誌任務寫入一個佇列後直接返回,然後使用一個執行緒專門負責從佇列中獲取日誌任務寫入磁碟,其模型具體如下圖:
-
如圖可知其實 logback 的非同步日誌模型是一個多生產者單消費者模型,通過使用佇列把同步日誌列印轉換為了非同步,業務執行緒呼叫非同步 appender 只需要把日誌任務放入日誌佇列,日誌執行緒則負責使用同步的 appender 進行具體的日誌列印到磁碟;
-
-
接下來看看非同步日誌列印具體實現,要把同步日誌列印改為非同步需要修改 logback 的 xml 配置檔案:
<appender name="PROJECT" class="ch.qos.logback.core.FileAppender"> <file>project.log</file> <encoding>UTF-8</encoding> <append>true</append> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- daily rollover --> <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern> <!-- keep 7 days' worth of history --> <maxHistory>7</maxHistory> </rollingPolicy> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern> <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer}, ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n %-5level %logger{35} - %m%n]]> </pattern> </layout> </appender> <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>1024</queueSize> <neverBlock>true</neverBlock> <appender-ref ref="PROJECT" /> </appender> <logger name="PROJECT_LOGGER" additivity="false"> <level value="WARN" /> <appender-ref ref="asyncProject" /> </logger>
-
從上面可知 AsyncAppender 是實現非同步日誌的關鍵,下面探究它的原理:
-
如上圖可知 AsyncAppender 繼承自 AsyncAppenderBase,其中後者具體實現了非同步日誌模型的主要功能,前者只是重寫了其中的一些方法。另外從類圖可知 logback 中的非同步日誌佇列是一個阻塞佇列, 後面會知道其實是一個有界阻塞佇列 ArrayBlockingQueue, 其中 queueSize 是有界佇列的元素個數預設為 256;
-
worker則是工作執行緒,也就是非同步列印日誌的消費者執行緒,aai則是一個appender的裝飾器,裡邊存放的同步日誌的appender,其中appenderCount記錄aai裡邊附加的同步appender的個數(這個和配置檔案相對應,一個非同步的appender對應一個同步的appender),neverBlock用來指示當同步佇列已滿時是否阻塞列印日誌執行緒(如果配置neverBlock=true,當佇列滿了之後,後面阻塞的執行緒想要輸出的訊息就直接被丟棄,從而執行緒不會阻塞),discardingThreshold是一個閾值,當日志佇列裡邊的空閒元素個數小於該值時,新來的某些級別的日誌就會直接被丟棄。
-
-
接下來看下何時建立的日誌佇列以及何時啟動的消費執行緒,這需要看下 AsyncAppenderBase 的 start 方法,該方法是在解析完畢配置 AsyncAppenderBase 的 xml 的節點元素後被呼叫 :
public void start() { if (isStarted()) return; if (appenderCount == 0) { addError("No attached appenders found."); return; } if (queueSize < 1) { addError("Invalid queue size [" + queueSize + "]"); return; } // 建立一個ArrayBlockingQueue阻塞佇列,queueSize預設為256,建立阻塞佇列的原因是:防止生產者過多,造成佇列中元素過多,產生OOM異常 blockingQueue = new ArrayBlockingQueue<E>(queueSize); // 如果discardingThreshold未定義的話,預設為queueSize的1/5 if (discardingThreshold == UNDEFINED) discardingThreshold = queueSize / 5; addInfo("Setting discardingThreshold to " + discardingThreshold); // 將工作執行緒設定為守護執行緒,即當jvm停止時,即使佇列中有未處理的元素,也不會在進行處理 worker.setDaemon(true); // 為執行緒設定name便於除錯 worker.setName("AsyncAppender-Worker-" + getName()); // make sure this instance is marked as "started" before staring the worker Thread // 啟動執行緒 super.start(); worker.start(); }
-
logback 使用的佇列是有界佇列 ArrayBlockingQueue,之所以使用有界佇列是考慮到記憶體溢位問題,在高併發下寫日誌的 qps 會很高如果設定為無界佇列佇列本身會佔用很大記憶體,很可能會造成 記憶體溢位。
-
這裡消費日誌佇列的 worker 執行緒被設定為了守護執行緒,意味著當主執行緒執行結束並且當前沒有使用者執行緒時候該 worker 執行緒會隨著 JVM 的退出而終止,而不管日誌佇列裡面是否還有日誌任務未被處理。另外這裡設定了執行緒的名稱是個很好的習慣,因為這在查詢問題的時候很有幫助,根據執行緒名字就可以定位到是哪個執行緒。
-
-
既然是有界佇列那麼肯定需要考慮如果佇列滿了,該如何處置,是丟棄老的日誌任務,還是阻塞日誌列印執行緒直到佇列有空餘元素那?下面看append 方法:
protected void append(E eventObject) { // 判斷佇列中的元素數量是否小於discardingThreshold,如果小於的話,並且日誌等級小於info的話,則直接丟棄這些日誌任務 if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } preprocess(eventObject); // 日誌入隊 put(eventObject); } private boolean isQueueBelowDiscardingThreshold() { return (blockingQueue.remainingCapacity() < discardingThreshold); } // 子類重寫的方法 判斷日誌等級 protected boolean isDiscardable(ILoggingEvent event) { Level level = event.getLevel(); return level.toInt() <= Level.INFO_INT; }
-
日誌入隊put:從下面可知如果 neverBlock 設定為 false(預設為 false)則會呼叫阻塞佇列的 put 方法,而 put 是阻塞的,也就是說如果當前佇列滿了,如果再企圖呼叫 put 方法向佇列放入一個元素則呼叫執行緒會被阻塞直到佇列有空餘空間。這裡有必要提下其中blockingQueue.put(eventObject)當日志佇列滿了的時候 put 方法會呼叫 await() 方法阻塞當前執行緒,如果其它執行緒中斷了該執行緒,那麼該執行緒會丟擲 InterruptedException 異常,那麼當前的日誌任務就會被丟棄了。如果 neverBlock 設定為了 true 則會呼叫阻塞佇列的 offer 方法,而該方法是非阻塞的,如果當前佇列滿了,則會直接返回,也就是丟棄當前日誌任務。
private void put(E eventObject) { // 判斷是否阻塞(預設為false),則會呼叫阻塞佇列的put方法 if (neverBlock) { blockingQueue.offer(eventObject); } else { putUninterruptibly(eventObject); } } // 可中斷的阻塞put方法 private void putUninterruptibly(E eventObject) { boolean interrupted = false; try { while (true) { try { blockingQueue.put(eventObject); break; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } }
-
-
最後看下 addAppender 方法,可以看出,一個非同步的appender只能繫結一個同步appender,這個appender會被放入AppenderAttachableImpl的appenderList列表裡邊
public void addAppender(Appender<E> newAppender) { if (appenderCount == 0) { appenderCount++; addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender."); aai.addAppender(newAppender); } else { addWarn("One and only one appender may be attached to AsyncAppender."); addWarn("Ignoring additional appender named [" + newAppender.getName() + "]"); } }
-
通過上面我們已經分析完了日誌生產執行緒放入日誌任務到日誌佇列的實現,下面一起來看下消費執行緒是如何從佇列裡面消費日誌任務並寫入磁碟的,由於消費執行緒是一個執行緒,那就從 worker 的 run 方法看起(消費者,將日誌寫入磁碟的執行緒方法):
class Worker extends Thread { public void run() { AsyncAppenderBase<E> parent = AsyncAppenderBase.this; AppenderAttachableImpl<E> aai = parent.aai; // loop while the parent is started 一直迴圈知道執行緒被中斷 while (parent.isStarted()) { try {// 從阻塞佇列中獲取元素,交由給同步的appender將日誌列印到磁碟 E e = parent.blockingQueue.take(); aai.appendLoopOnAppenders(e); } catch (InterruptedException ie) { break; } } addInfo("Worker thread will flush remaining events before exiting. "); //執行到這裡說明該執行緒被中斷,則把佇列裡邊的剩餘日誌任務重新整理到磁碟 for (E e : parent.blockingQueue) { aai.appendLoopOnAppenders(e); parent.blockingQueue.remove(e); } aai.detachAndStopAllAppenders(); } }
-
try邏輯中從日誌佇列使用 take 方法獲取一個日誌任務,如果當前佇列為空則當前執行緒會阻塞到 take 方法直到佇列不為空才返回,獲取到日誌任務後會呼叫 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,該方法會迴圈呼叫通過 addAppender 注入的同步日誌 appener 具體實現日誌列印到磁碟的任務。
-
四.參考:
- 公平鎖的使用場景:https://stackoverflow.com/questions/26455578/when-to-use-fairness-mode-in-java-concurrency
- 公平鎖和非公平鎖的區別的提問:https://segmentfault.com/q/1010000006439146
- 公平鎖不能保證執行緒排程的公平性:https://stackoverflow.com/questions/60903107/understanding-fair-reentrantlock-in-java
- logback非同步日誌列印中的ArrayBlockingQueue的使用:https://my.oschina.net/u/4410397/blog/3428573/print