多執行緒和併發庫相關
LinkedBlockingQueue阻塞佇列
LinkedBlockingQueue類圖
LinkedBlockingQueue中也有兩個Node分別用來存放首尾節點,並且裡面有個初始值為0的原子變數count用來記錄佇列元素個數,另外裡面有兩個ReentrantLock的獨佔鎖,分別用來控制元素入隊和出隊加鎖,其中takeLock用來控制同時只有一個執行緒可以從佇列獲取元素,其他執行緒必須等待,putLock控制同時只能有一個執行緒可以獲取鎖去新增元素,其他執行緒必須等待。另外notEmpty和notFull用來實現入隊和出隊的同步。 另外由於出入隊是兩個非公平獨佔鎖,所以可以同時又一個執行緒入隊和一個執行緒出隊,其實這個是個生產者-消費者模型。
/**通過take取出進行加鎖、取出 */ private final ReentrantLock takeLock = new ReentrantLock(); /**等待中的佇列等待取出 */ private final Condition notEmpty = takeLock.newCondition(); /*通過put放置進行加鎖、放置*/ private final ReentrantLock putLock = new ReentrantLock(); /**等待中的佇列等待放置 */ private final Condition notFull = putLock.newCondition(); /*記錄集合中的個數(計數器) */ private final AtomicInteger count = new AtomicInteger(0); 佇列的容量: //佇列初始容量,Integer最大值 public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾節點 last = head = new Node(null); }
如圖預設佇列容量為0x7fffffff;使用者也可以自己指定容量。
LinkedBlockingQueue方法
ps:下面介紹LinkedBlockingQueue用到很多Lock物件。詳細可以查詢Lock物件的介紹
帶時間的Offer操作-生產者
在ArrayBlockingQueue中已經簡單介紹了Offer()方法,LinkedBlocking的Offer方法類似,在此就不過多去介紹。這次我們從介紹下帶時間的Offer方法
帶時間的poll操作-消費者public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //空元素拋空指標異常 if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //獲取可被中斷鎖,只有一個執行緒克獲取 putLock.lockInterruptibly(); try { //如果佇列滿則進入迴圈 while (count.get() == capacity) { //nanos<=0直接返回 if (nanos <= 0) return false; //否者呼叫await進行等待,超時則返回<=0(1) nanos = notFull.awaitNanos(nanos); } //await在超時時間內返回則新增元素(2) enqueue(new Node(e)); c = count.getAndIncrement(); //佇列不滿則啟用其他等待入隊執行緒(3) if (c + 1 < capacity) notFull.signal(); } finally { //釋放鎖 putLock.unlock(); } //c==0說明佇列裡面有一個元素,這時候喚醒出隊執行緒(4) if (c == 0) signalNotEmpty(); return true; } private void enqueue(Node node) { last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
獲取並移除隊首元素,在指定的時間內去輪詢佇列看有沒有首元素有則返回,否者超時後返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//出隊執行緒獲取獨佔鎖
takeLock.lockInterruptibly();
try {
//迴圈直到佇列不為空
while (count.get() == 0) {
//超時直接返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出隊,計數器減一
x = dequeue();
c = count.getAndDecrement();
//如果出隊前佇列不為空則傳送訊號,啟用其他阻塞的出隊執行緒
if (c > 1)
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
//當前佇列容量為最大值-1則啟用入隊執行緒。
if (c == capacity)
signalNotFull();
return x;
}
首先獲取獨佔鎖,然後進入迴圈噹噹前佇列有元素才會退出迴圈,或者超時了,直接返回null。
超時前退出迴圈後,就從佇列移除元素,然後計數器減去一,如果減去1前佇列元素大於1則說明當前移除後佇列還有元素,那麼就發訊號啟用其他可能阻塞到當前條件訊號的執行緒。
最後如果減去1前佇列元素個數=最大值,那麼移除一個後會騰出一個空間來,這時候可以啟用可能存在的入隊阻塞執行緒。
put操作-生產者
與帶超時時間的poll類似不同在於put時候如果當前佇列滿了它會一直等待其他執行緒呼叫notFull.signal才會被喚醒。
take操作-消費者
與帶超時時間的poll類似不同在於take時候如果當前佇列空了它會一直等待其他執行緒呼叫notEmpty.signal()才會被喚醒。
size操作-消費者
當前佇列元素個數,如程式碼直接使用原子變數count獲取。
public int size() {
return count.get();
}
peek操作
獲取但是不移除當前佇列的頭元素,沒有則返回null。
public E peek() {
//佇列空,則返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
remove操作刪除佇列裡面的一個元素,有則刪除返回true,沒有則返回false,在刪除操作時候由於要遍歷佇列所以加了雙重鎖,也就是在刪除過程中不允許入隊也不允許出隊操作。
public boolean remove(Object o) {
if (o == null) return false;
//雙重加鎖
fullyLock();
try {
//遍歷佇列找則刪除返回true
for (Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
//找不到返回false
return false;
} finally {
//解鎖
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
void unlink(Node p, Node trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果當前佇列滿,刪除後,也不忘記最快的喚醒等待的執行緒
if (count.getAndDecrement() == capacity)
notFull.signal();
}
開源框架的使用tomcat中任務佇列TaskQueue。
可知TaskQueue繼承了LinkedBlockingQueue並且泛化型別固定了為Runnalbe.重寫了offer,poll,take方法。
tomcat中有個執行緒池ThreadPoolExecutor,在NIOEndPoint中當acceptor執行緒接受到請求後,會把任務放入佇列,然後poller 執行緒從佇列裡面獲取任務,然後就把任務放入執行緒池執行。這個ThreadPoolExecutor中的的一個引數就是TaskQueue。
先看看ThreadPoolExecutor的引數如果是普通LinkedBlockingQueue是怎麼樣的執行邏輯: 當呼叫執行緒池方法execute() 方法新增一個任務時:
如果當前執行的執行緒數量小於corePoolSize,則建立新執行緒執行該任務
如果當前執行的執行緒數量大於或等於corePoolSize,則將這個任務放入阻塞佇列。
如果當前佇列滿了,並且當前執行的執行緒數量小於maximumPoolSize,則建立新執行緒執行該任務;
如果當前佇列滿了,並且當前執行的執行緒數量大於或等於maximumPoolSize,那麼執行緒池將會丟擲RejectedExecutionException異常。
如果執行緒執行完了當前任務,那麼會去佇列裡面獲取一個任務來執行,如果任務執行完了,並且當前執行緒數大於corePoolSize,那麼會根據執行緒空閒時間keepAliveTime回收一些執行緒保持執行緒池corePoolSize個執行緒。
首先看下執行緒池中exectue新增任務時候的邏輯:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//當前工作執行緒個數小於core個數則開新執行緒執行(1)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//放入佇列(2)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果佇列滿了則開新執行緒,但是個數要不超過最大值,超過則返回false
//然後執行reject handler(3)
else if (!addWorker(command, false))
reject(command);
}
可知噹噹前工作執行緒個數為corePoolSize後,如果在來任務會把任務新增到佇列,佇列滿了或者入隊失敗了則開啟新執行緒。
然後看看TaskQueue中重寫的offer方法的邏輯:
public boolean offer(Runnable o) {
//如果parent為null則直接呼叫父類方法
if (parent==null) return super.offer(o);
//如果當前執行緒池中執行緒個數達到最大,則無條件呼叫父類方法
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//如果當前提交的任務小於當前執行緒池執行緒數,說明執行緒用不完,沒必要重新開執行緒
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//如果當前執行緒池執行緒個數>core個數但是小於最大個數,則開新執行緒代替放入佇列
if (parent.getPoolSize()
//到了這裡,無條件呼叫父類
return super.offer(o);
}
可知parent.getPoolSize()
LinkedBlockingQueue安全分析總結
仔細思考下阻塞佇列是如何實現併發安全的維護佇列連結串列的,先分析下簡單的情況就是當佇列裡面有多個元素時候,由於同時只有一個執行緒(通過獨佔鎖putLock實現)入隊元素並且是操作last節點(,而同時只有一個出隊執行緒(通過獨佔鎖takeLock實現)操作head節點,所以不存在併發安全問題。
考慮當佇列為空的時候佇列狀態為:
這時候假如一個執行緒呼叫了take方法,由於佇列為空,所以count.get()==0所以當前執行緒會呼叫notEmpty.await()把自己掛起,並且放入notEmpty的條件佇列,並且釋放當前條件變數關聯的通過takeLock.lockInterruptibly()獲取的獨佔鎖。由於釋放了鎖,所以這時候其他執行緒呼叫take時候就會通過takeLock.lockInterruptibly()獲取獨佔鎖,然後同樣阻塞到notEmpty.await(),同樣會被放入notEmpty的條件佇列,也就說在佇列為空的情況下可能會有多個執行緒因為呼叫take被放入了notEmpty的條件佇列。
這時候如果有一個執行緒呼叫了put方法,那麼就會呼叫enqueue操作,該操作會在last節點後面新增新元素並且設定last為新節點。然後count.getAndIncrement()先獲取當前佇列元個數為0儲存到c,然後自增count為1,由於c==0所以呼叫signalNotEmpty啟用notEmpty的條件佇列裡面的阻塞時間最長的執行緒,這時候take中呼叫notEmpty.await()的執行緒會被啟用await內部會重新去獲取獨佔鎖獲取成功則返回,否者被放入AQS的阻塞佇列,如果獲取成功,那麼count.get() >0因為可能多個執行緒put了,所以呼叫dequeue從佇列獲取元素(這時候一定可以獲取到),然後呼叫c = count.getAndDecrement() 把當前計數返回後並減去1,如果c>1 說明當前佇列還有其他元素,那麼就呼叫 notEmpty.signal()去啟用 notEmpty的條件佇列裡面的其他阻塞執行緒。
考慮當佇列滿的時候:
當佇列滿的時候呼叫put方法時候,會由於notFull.await()當前執行緒被阻塞放入notFull管理的條件佇列裡面,同理可能會有多個呼叫put方法的執行緒都放到了notFull的條件佇列裡面。
這時候如果有一個執行緒呼叫了take方法,呼叫dequeue()出隊一個元素,c = count.getAndDecrement();count值減一;c==capacity;現在佇列有一個空的位置,所以呼叫signalNotFull()啟用notFull條件佇列裡面等待最久的一個執行緒。
LinkedBlockingQueue簡單示例
併發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞佇列。該類主要提供了兩個方法put()和take(),前者將一個物件放到佇列中,如果佇列已經滿了,就等待直到有空閒節點;後者從head取一個物件,如果沒有物件,就等待直到有可取的物件。
下面的例子比較簡單,一個讀執行緒,用於將要處理的檔案物件新增到阻塞佇列中,另外四個寫執行緒用於取出檔案物件,為了模擬寫操作耗時長的特點,特讓執行緒睡眠一段隨機長度的時間。另外,該Demo也使用到了執行緒池和原子整型 (AtomicInteger),AtomicInteger可以在併發情況下達到原子化更新,避免使用了synchronized,而且效能非常高。由 於阻塞佇列的put和take操作會阻塞,為了使執行緒退出,特在佇列中添加了一個“標識”,演算法中也叫“哨兵”,當發現這個哨兵後,寫執行緒就退出。
當然執行緒池也要顯式退出了。
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能容納100個檔案
final BlockingQueue queue = new LinkedBlockingQueue(100);
// 執行緒池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// 完成標誌
final File exitFile = new File("");
// 讀個數
final AtomicInteger rc = new AtomicInteger();
// 寫個數
final AtomicInteger wc = new AtomicInteger();
// 讀執行緒
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory() || pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " " + file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四個寫執行緒
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = (File) queue.take();
// 佇列已經無物件
if (file == exitFile) {
// 再次新增"標誌",以讓其他執行緒正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " " + file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}
PriorityBlockingQueue無界阻塞優先順序佇列PriorityBlockingQueue是帶優先順序的無界阻塞佇列,每次出隊都返回優先順序最高的元素,是二叉樹最小堆的實現,研究過陣列方式存放最小堆節點的都知道,直接遍歷佇列元素是無序的。
PriorityBlockingQueue類圖結構
如圖PriorityBlockingQueue內部有個陣列queue用來存放佇列元素,size用來存放佇列元素個數,allocationSpinLockOffset是用來在擴容佇列時候做cas的,目的是保證只有一個執行緒可以進行擴容。
由於這是一個優先順序佇列所以有個比較器comparator用來比較元素大小。lock獨佔鎖物件用來控制同時只能有一個執行緒可以進行入隊出隊操作。notEmpty條件變數用來實現take方法阻塞模式。這裡沒有notFull 條件變數是因為這裡的put操作是非阻塞的,為啥要設計為非阻塞的是因為這是無界佇列。最後PriorityQueue q用來搞序列化的。
如下建構函式,預設佇列容量為11,預設比較器為null;
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
PriorityBlockingQueue方法
Offer操作
在佇列插入一個元素,由於是無界佇列,所以一直為成功返回true;
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//如果當前元素個數>=佇列容量,則擴容(1)
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator cmp = comparator;
//預設比較器為null
if (cmp == null)(2)
siftUpComparable(n, e, array);
else
//自定義比較器(3)
siftUpUsingComparator(n, e, array, cmp);
//佇列元素增加1,並且啟用notEmpty的條件佇列裡面的一個阻塞執行緒
size = n + 1;(9)
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
主流程比較簡單,下面看看兩個主要函式
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //must release and then re-acquire main lock
Object[] newArray = null;
//cas成功則擴容(4)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//oldGap<64則擴容新增oldcap+2,否者擴容50%,並且最大為MAX_ARRAY_SIZE
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//第一個執行緒cas成功後,第二個執行緒會進入這個地方,然後第二個執行緒讓出cpu,儘量讓第一個執行緒執行下面點獲取鎖,但是這得不到肯定的保證。(5)
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();(6)
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
tryGrow目的是擴容,這裡要思考下為啥在擴容前要先釋放鎖,然後使用cas控制只有一個執行緒可以擴容成功。我的理解是為了效能,因為擴容時候是需要花時間的,如果這些操作時候還佔用鎖那麼其他執行緒在這個時候是不能進行出隊操作的,也不能進行入隊操作,這大大降低了併發性。
所以在擴容前釋放鎖,這允許其他出隊執行緒可以進行出隊操作,但是由於釋放了鎖,所以也允許在擴容時候進行入隊操作,這就會導致多個執行緒進行擴容會出現問題,所以這裡使用了一個spinlock用cas控制只有一個執行緒可以進行擴容,失敗的執行緒呼叫Thread.yield()讓出cpu,目的意在讓擴容執行緒擴容後優先呼叫lock.lock重新獲取鎖,但是這得不到一定的保證,有可能呼叫Thread.yield()的執行緒先獲取了鎖。
那copy元素資料到新陣列為啥放到獲取鎖後面那?原因應該是因為可見性問題,因為queue並沒有被volatile修飾。另外有可能在擴容時候進行了出隊操作,如果直接拷貝可能看到的陣列元素不是最新的。而通過呼叫Lock後,獲取的陣列則是最新的,並且在釋放鎖前 陣列內容不會變化。
具體建堆演算法:
private static void siftUpComparable(int k, T x, Object[] array) {
Comparable key = (Comparable) x;
//佇列元素個數>0則判斷插入位置,否者直接入隊(7)
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;(8)
}
Poll操作
在佇列頭部獲取並移除一個元素,如果佇列為空,則返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
主要看dequeue
private E dequeue() {
//佇列為空,則返回null
int n = size - 1;
if (n < 0)
return null;
else {
//獲取隊頭元素(1)
Object[] array = queue;
E result = (E) array[0];
//獲取對尾元素,並值null(2)
E x = (E) array[n];
array[n] = null;
Comparator cmp = comparator;
if (cmp == null)//cmp=null則呼叫這個,把對尾元素位置插入到0位置,並且調整堆為最小堆(3)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;(4)
return result;
}
}
private static void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable key = (Comparable)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
下面重點說說siftDownComparable這個屌屌的建立最小堆的演算法:
首先說下思想,其中k一開始為0,x為數組裡面最後一個元素,由於第0個元素為樹根,被出隊時候要被搞掉,所以建堆要從它的左右孩子節點找一個最小的值來當樹根,子樹根被搞掉後,會找子樹的左右孩子最小的元素來代替,直到樹節點為止。
Put操作
內部呼叫的offer,由於是無界佇列,所以不需要阻塞
public void put(E e) {
offer(e); // never need to block
}
Take操作獲取佇列頭元素,如果佇列為空則阻塞。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果佇列為空,則阻塞,把當前執行緒放入notEmpty的條件佇列
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
這裡是阻塞實現,阻塞後直到入隊操作呼叫notEmpty.signal 才會返回。
Size操作
獲取佇列元個數,由於加了獨佔鎖所以返回結果是精確的
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
PriorityBlockingQueue小結
PriorityBlockingQueue類似於ArrayBlockingQueue內部使用一個獨佔鎖來控制同時只有一個執行緒可以進行入隊和出隊,另外前者只使用了一個notEmpty條件變數而沒有notFull這是因為前者是無界佇列,當put時候永遠不會處於await所以也不需要被喚醒。
PriorityBlockingQueue始終保證出隊的元素是優先順序最高的元素,並且可以定製優先順序的規則,內部通過使用一個二叉樹最小堆演算法來維護內部陣列,這個陣列是可擴容的,噹噹前元素個數>=最大容量時候會通過演算法擴容。
值得注意的是為了避免在擴容操作時候其他執行緒不能進行出隊操作,實現上使用了先釋放鎖,然後通過cas保證同時只有一個執行緒可以擴容成功。
PriorityBlockingQueue示例
PriorityBlockingQueue類是JDK提供的優先順序佇列 本身是執行緒安全的 內部使用顯示鎖 保證執行緒安全。
PriorityBlockingQueue儲存的物件必須是實現Comparable介面的 因為PriorityBlockingQueue佇列會根據內部儲存的每一個元素的compareTo方法比較每個元素的大小。這樣在take出來的時候會根據優先順序 將優先順序最小的最先取出。
下面是示例程式碼
public static PriorityBlockingQueue queue = new PriorityBlockingQueue();
public static void main(String[] args) {
queue.add(new User(1,"wu"));
queue.add(new User(5,"wu5"));
queue.add(new User(23,"wu23"));
queue.add(new User(55,"wu55"));
queue.add(new User(9,"wu9"));
queue.add(new User(3,"wu3"));
for (User user : queue) {
try {
System.out.println(queue.take().name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//靜態內部類
static class User implements Comparable{
public User(int age,String name) {
this.age = age;
this.name = name;
}
int age;
String name;
@Override
public int compareTo(User o) {
return this.age > o.age ? -1 : 1;
}
}
SychronousQueue同步佇列
SynchronousQueue是一個比較特別的佇列,由於線上程池方面有所應用,為了更好的理解執行緒池的實現原理,此佇列原始碼中充斥著大量的CAS語句,理解起來是有些難度的,為了方便日後回顧,本篇文章會以簡潔的圖形化方式展示該佇列底層的實現原理。
SychronousQueue簡單實用
經典的生產者-消費者模式,操作流程是這樣的:
有多個生產者,可以併發生產產品,把產品置入佇列中,如果佇列滿了,生產者就會阻塞;
有多個消費者,併發從佇列中獲取產品,如果佇列空了,消費者就會阻塞;
SynchronousQueue 也是一個佇列來的,但它的特別之處在於它內部沒有容器,一個生產執行緒,當它生產產品(即put的時候),如果當前沒有人想要消費產品(即當前沒有執行緒執行take),此生產執行緒必須阻塞,等待一個消費執行緒呼叫take操作,take操作將會喚醒該生產執行緒,同時消費執行緒會獲取生產執行緒的產品(即資料傳遞),這樣的一個過程稱為一次配對過程(當然也可以先take後put,原理是一樣的)。
我們用一個簡單的程式碼來驗證一下,如下所示:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue queue = new SynchronousQueue();
Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("put thread start");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put thread end");
}
});
Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("take thread start");
try {
System.out.println("take from putThread: " + queue.take());
} catch (InterruptedException e) {
}
System.out.println("take thread end");
}
});
putThread.start();
Thread.sleep(1000);
takeThread.start();
}
}
一種輸出結果如下:
put thread start
take thread start
take from putThread: 1
put thread end
take thread end
從結果可以看出,put執行緒執行queue.put(1) 後就被阻塞了,只有take執行緒進行了消費,put執行緒才可以返回。可以認為這是一種執行緒與執行緒間一對一傳遞訊息的模型。
SychronousQueue實現原理
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞佇列依賴AQS實現併發操作,SynchronousQueue直接使用CAS實現執行緒的安全訪問。
佇列的實現策略通常分為公平模式和非公平模式,接下來將分別進行說明。
公平模式下的模型:
公平模式下,底層實現使用的是TransferQueue這個內部佇列,它有一個head和tail指標,用於指向當前正在等待匹配的執行緒節點。
接著我們進行一些操作:
1、執行緒put1執行 put(1)操作,由於當前沒有配對的消費執行緒,所以put1執行緒入佇列,自旋一小會後睡眠等待。
2、接著,執行緒put2執行了put(2)操作,跟前面一樣,put2執行緒入佇列,自旋一小會後睡眠等待。
3、這時候,來了一個執行緒take1,執行了 take操作,由於tail指向put2執行緒,put2執行緒跟take1執行緒配對了(一put一take),這時take1執行緒不需要入隊,但是請注意了,這時候,要喚醒的執行緒並不是put2,而是put1。
為何?大家應該知道我們現在講的是公平策略,所謂公平就是誰先入隊了,誰就優先被喚醒,我們的例子明顯是put1應該優先被喚醒。至於讀者可能會有一個疑問,明明是take1執行緒跟put2執行緒匹配上了,結果是put1執行緒被喚醒消費,怎麼確保take1執行緒一定可以和次首節點(head.next)也是匹配的呢?其實大家可以拿個紙畫一畫,就會發現真的就是這樣的。
公平策略總結下來就是:隊尾匹配隊頭出隊。
執行後put1執行緒被喚醒,take1執行緒的 take()方法返回了1(put1執行緒的資料),這樣就實現了執行緒間的一對一通訊。
4、最後,再來一個執行緒take2,執行take操作,這時候只有put2執行緒在等候,而且兩個執行緒匹配上了,執行緒put2被喚醒, take2執行緒take操作返回了2(執行緒put2的資料),這時候佇列又回到了起點。
以上便是公平模式下,SynchronousQueue的實現模型。總結下來就是:隊尾匹配隊頭出隊,先進先出,體現公平原則。
非公平模式下的模型:
我們還是使用跟公平模式下一樣的操作流程,對比兩種策略下有何不同。非公平模式底層的實現使用的是TransferStack,一個棧,實現中用head指標指向棧頂,接著我們看看它的實現模型:
1、執行緒put1執行 put(1)操作,由於當前沒有配對的消費執行緒,所以put1執行緒入棧,自旋一小會後睡眠等待。
2、接著,執行緒put2再次執行了put(2)操作,跟前面一樣,put2執行緒入棧,自旋一小會後睡眠等待。
3、這時候,來了一個執行緒take1,執行了take操作,這時候發現棧頂為put2執行緒,匹配成功,但是實現會先把take1執行緒入棧,然後take1執行緒迴圈執行匹配put2執行緒邏輯,一旦發現沒有併發衝突,就會把棧頂指標直接指向 put1執行緒。
4、最後,再來一個執行緒take2,執行take操作,這跟步驟3的邏輯基本是一致的,take2執行緒入棧,然後在迴圈中匹配put1執行緒,最終全部匹配完畢,棧變為空,恢復初始狀態。
可以從上面流程看出,雖然put1執行緒先入棧了,但是卻是後匹配,這就是非公平的由來。
SychronousQueue總結
SynchronousQueue由於其獨有的執行緒一一配對通訊機制,在大部分平常開發中,可能都不太會用到,但執行緒池技術中會有所使用,由於內部沒有使用AQS,而是直接使用CAS,所以程式碼理解起來會比較困難,但這並不妨礙我們理解底層的實現模型,在理解了模型的基礎上,有興趣的話再查閱原始碼,就會有方向感,看起來也會比較容易,希望本文有所借鑑意義。
DeplayQueue延時無界阻塞佇列
在談到DelayQueue的使用和原理的時候,我們首先介紹一下DelayQueue,DelayQueue是一個無界阻塞佇列,只有在延遲期滿時才能從中提取元素。該佇列的頭部是延遲期滿後儲存時間最長的Delayed 元素。
DelayQueue阻塞佇列在我們系統開發中也常常會用到,例如:快取系統的設計,快取中的物件,超過了空閒時間,需要從快取中移出;任務排程系統,能夠準確的把握任務的執行時間。我們可能需要通過執行緒處理很多時間上要求很嚴格的資料,如果使用普通的執行緒,我們就需要遍歷所有的物件,一個一個的檢 檢視資料是否過期等,首先這樣在執行上的效率不會太高,其次就是這種設計的風格也大大的影響了資料的精度。一個需要12:00點執行的任務可能12:01 才執行,這樣對資料要求很高的系統有更大的弊端。由此我們可以使用DelayQueue。
下面將會對DelayQueue做一個介紹,然後舉個例子。並且提供一個Delayed介面的實現和Sample程式碼。DelayQueue是一個BlockingQueue,其特化的引數是Delayed。(不瞭解BlockingQueue的同學,先去了解BlockingQueue再看本文)
Delayed擴充套件了Comparable介面,比較的基準為延時的時間值,Delayed介面的實現類getDelay的返回值應為固定值(final)。DelayQueue內部是使用PriorityQueue實現的。
DelayQueue = BlockingQueue +PriorityQueue + Delayed
DelayQueue定義和原理
DelayQueue的關鍵元素BlockingQueue、PriorityQueue、Delayed。可以這麼說,DelayQueue是一個使用優先佇列(PriorityQueue)實現的BlockingQueue,優先佇列的比較基準值是時間。
他們的基本定義如下
public interface Comparable {
public int compareTo(T o);
}
public interface Delayed extends Comparable {
long getDelay(TimeUnit unit);
}
public class DelayQueue implements BlockingQueue {
private final PriorityQueue q = new PriorityQueue();
}
DelayQueue內部的實現使用了一個優先佇列。當呼叫DelayQueue的offer方法時,把Delayed物件加入到優先佇列q中。如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
DelayQueue的take方法,把優先佇列q的first拿出來(peek),如果沒有達到延時閥值,則進行await處理。如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
DelayQueue例項應用
Ps:為了具有呼叫行為,存放到DelayDeque的元素必須繼承Delayed介面。Delayed介面使物件成為延遲物件,它使存放在DelayQueue類中的物件具有了啟用日期。該介面強制執行下列兩個方法。
一下將使用Delay做一個快取的實現。其中共包括三個類
Pair
DelayItem
Cache
Pair類:
public class Pair {
public K first;
public V second;
public Pair() {}
public Pair(K first, V second) {
this.first = first;
this.second = second;
}
}
一下是對Delay介面的實現:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class DelayItem implements Delayed {
/** Base of nanosecond timings, to avoid wrapping */
private static final long NANO_ORIGIN = System.nanoTime();
/**
* Returns nanosecond time offset by origin
*/
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}
/**
* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
* entries.
*/
private static final AtomicLong sequencer = new AtomicLong(0);
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private final long time;
private final T item;
public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}
public T getItem() {
return this.item;
}
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayItem) {
DelayItem x = (DelayItem) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
以下是Cache的實現,包括了put和get方法
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Cache {
private static final Logger LOG = Logger.getLogger(Cache.class.getName());
private ConcurrentMap cacheObjMap = new ConcurrentHashMap();
private DelayQueue>> q = new DelayQueue>>();
private Thread daemonThread;
public Cache() {
Runnable daemonTask = new Runnable() {
public void run() {
daemonCheck();
}
};
daemonThread = new Thread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}
private void daemonCheck() {
if (LOG.isLoggable(Level.INFO))
LOG.info("cache service started.");
for (;;) {
try {
DelayItem> delayItem = q.take();
if (delayItem != null) {
//超時物件處理
Pair pair = delayItem.getItem();
cacheObjMap.remove(pair.first, pair.second); // compare and remove
}
} catch (InterruptedException e) {
if (LOG.isLoggable(Level.SEVERE))
LOG.log(Level.SEVERE, e.getMessage(), e);
break;
}
}
if (LOG.isLoggable(Level.INFO))
LOG.info("cache service stopped.");
}
//新增快取物件
public void put(K key, V value, long time, TimeUnit unit) {
V oldValue = cacheObjMap.put(key, value);
if (oldValue != null)
q.remove(key);
long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
q.put(new DelayItem>(new Pair(key, value), nanoTime));
}
public V get(K key) {
return cacheObjMap.get(key);
}
測試main方法:
//測試入口函式
public static void main(String[] args) throws Exception {
Cache cache = new Cache();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
}
輸出結果為:
aaaa
null
我們看到上面的結果,如果超過延時的時間,那麼快取中資料就會自動丟失,獲得就為null。
併發(Collection)佇列-非阻塞佇列
非阻塞佇列
首先我們要簡單的理解下什麼是非阻塞佇列:
與阻塞佇列相反,非阻塞佇列的執行並不會被阻塞,無論是消費者的出隊,還是生產者的入隊。
在底層,非阻塞佇列使用的是CAS(compare andswap)來實現執行緒執行的非阻塞。
非阻塞佇列簡單操作
與阻塞佇列相同,非阻塞佇列中的常用方法,也是出隊和入隊。
入隊方法:
add():底層呼叫offer();
offer():Queue介面繼承下來的方法,實現佇列的入隊操作,不會阻礙執行緒的執行,插入成功返回true;
出隊方法:
poll():移動頭結點指標,返回頭結點元素,並將頭結點元素出隊;佇列為空,則返回null;
peek():移動頭結點指標,返回頭結點元素,並不會將頭結點元素出隊;佇列為空,則返回null;
非阻塞演算法CAS
首先我們需要了解悲觀鎖和樂觀鎖
悲觀鎖:假定併發環境是悲觀的,如果發生併發衝突,就會破壞一致性,所以要通過獨佔鎖徹底禁止衝突發生。有一個經典比喻,“如果你不鎖門,那麼搗蛋鬼就回闖入並搞得一團糟”,所以“你只能一次開啟門放進一個人,才能時刻盯緊他”。樂觀鎖:假定併發環境是樂觀的,即,雖然會有併發衝突,但衝突可發現且不會造成損害,所以,可以不加任何保護,等發現併發衝突後再決定放棄操作還是重試。可類比的比喻為,“如果你不鎖門,那麼雖然搗蛋鬼會闖入,但他們一旦打算破壞你就能知道”,所以“你大可以放進所有人,等發現他們想破壞的時候再做決定”。
通常認為樂觀鎖的效能比悲觀所更高,特別是在某些複雜的場景。這主要由於悲觀鎖在加鎖的同時,也會把某些不會造成破壞的操作保護起來;而樂觀鎖的競爭則只發生在最小的併發衝突處,如果用悲觀鎖來理解,就是“鎖的粒度最小”。但樂觀鎖的設計往往比較複雜,因此,複雜場景下還是多用悲觀鎖。
首先保證正確性,有必要的話,再去追求效能。
CAS
樂觀鎖的實現往往需要硬體的支援,多數處理器都都實現了一個CAS指令,實現“Compare And Swap”的語義(這裡的swap是“換入”,也就是set),構成了基本的樂觀鎖。
CAS包含3個運算元:
[if !supportLists]n [endif]需要讀寫的記憶體位置V
[if !supportLists]n [endif]進行比較的值A
[if !supportLists]n [endif]擬寫入的新值B
當且僅當位置V的值等於A時,CAS才會通過原子方式用新值B來更新位置V的值;否則不會執行任何操作。無論位置V的值是否等於A,都將返回V原有的值。
一個有意思的事實是,“使用CAS控制併發”與“使用樂觀鎖”並不等價。CAS只是一種手段,既可以實現樂觀鎖,也可以實現悲觀鎖。樂觀、悲觀只是一種併發控制的策略。下文將分別用CAS實現悲觀鎖和樂觀鎖?
ConcurrentLinkedQueue非阻塞無界連結串列佇列
ConcurrentLinkedQueue是一個執行緒安全的佇列,基於連結串列結構實現,是一個無界佇列,理論上來說佇列的長度可以無限擴大。
與其他佇列相同,ConcurrentLinkedQueue也採用的是先進先出(FIFO)入隊規則,對元素進行排序。當我們向佇列中新增元素時,新插入的元素會插入到佇列的尾部;而當我們獲取一個元素時,它會從佇列的頭部中取出。
因為ConcurrentLinkedQueue是連結串列結構,所以當入隊時,插入的元素依次向後延伸,形成連結串列;而出隊時,則從連結串列的第一個元素開始獲取,依次遞增;
不知道,我這樣形容能否讓你對連結串列的入隊、出隊產生一個大概的思路!
ConcurrentLinkedQuere簡單示例
值得注意的是,在使用ConcurrentLinkedQueue時,如果涉及到佇列是否為空的判斷,切記不可使用size()==0的做法,因為在size()方法中,是通過遍歷整個連結串列來實現的,在佇列元素很多的時候,size()方法十分消耗效能和時間,只是單純的判斷佇列為空使用isEmpty()即可!!!
public class ConcurrentLinkedQueueTest {
public static int threadCount = 10;
public static ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
static class Offer implements Runnable {
public void run() {
//不建議使用queue.size()==0,影響效率。可以使用!queue.isEmpty()
if(queue.size()==0){
String ele = new Random().nextInt(Integer.MAX_VALUE)+"";
queue.offer(ele);
System.out.println("入隊元素為"+ele);
}
}
}
static class Poll implements Runnable {
public void run() {
if(!queue.isEmpty()){
String ele = queue.poll();
System.out.println("出隊元素為"+ele);
}
}
}
public static void main(String[] agrs) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for(int x=0;x
executorService.submit(new Offer());
executorService.submit(new Poll());
}
executorService.shutdown();
}
}
一種輸出:
入隊元素為313732926
出隊元素為313732926
入隊元素為812655435
出隊元素為812655435
入隊元素為1893079357
出隊元素為1893079357
入隊元素為1137820958
出隊元素為1137820958
入隊元素為1965962048
出隊元素為1965962048
出隊元素為685567162
入隊元素為685567162
出隊元素為1441081163
入隊元素為1441081163
出隊元素為1627184732
入隊元素為1627184732
ConcurrentLinkedQuere類圖
ConcurrentLinkedQueue中有兩個volatile型別的Node節點分別用來存在列表的首尾節點,其中head節點存放連結串列第一個item為null的節點,tail則並不是總指向最後一個節點。Node節點內部則維護一個變數item用來存放節點的值,next用來存放下一個節點,從而連結為一個單向無界列表。public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
如上程式碼初始化時候會構建一個item為NULL的空節點作為連結串列的首尾節點。ConcurrentLinkedQuere方法
Offer操作
offer操作是在連結串列末尾新增一個元素,下面看看實現原理。
public boolean offer(E e) {
//e為null則丟擲空指標異常
checkNotNull(e);
//構造Node節點建構函式內部呼叫unsafe.putObject,後面統一講
final Node newNode = new Node(e);
//從尾節點插入
for (Node t = tail, p = t;;) {
Node q = p.next;
//如果q=null說明p是尾節點則插入
if (q == null) {
//cas插入(1)
if (p.casNext(null, newNode)) {
//cas成功說明新增節點已經被放入連結串列,然後設定當前尾節點(包含head,1,3,5.。。個節點為尾節點)
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)//(2)
//多執行緒操作時候,由於poll時候會把老的head變為自引用,然後head的next變為新head,所以這裡需要
//重新找新的head,因為新的head後面的節點才是啟用的節點
p = (t != (t = tail)) ? t : head;
else
//尋找尾節點(3)
p = (p != t && t != (t = tail)) ? t : q;
}
}
從建構函式知道一開始有個item為null的哨兵節點,並且head和tail都是指向這個節點,然後當一個執行緒呼叫offer時候首先
如圖首先查詢尾節點,q==null,p就是尾節點,所以執行p.casNext通過cas設定p的next為新增節點,這時候p==t所以不重新設定尾節點為當前新節點。由於多執行緒可以呼叫offer方法,所以可能兩個執行緒同時執行到了(1)進行cas,那麼只有一個會成功(假如執行緒1成功了)。
add操作
add操作是在連結串列末尾新增一個元素,下面看看實現原理。其實內部呼叫的還是offer
public boolean add(E e) {
return offer(e);
}
poll操作
poll操作是在連結串列頭部獲取並且移除一個元素,下面看看實現原理。
public E poll() {
restartFromHead:
//死迴圈
for (;;) {
//死迴圈
for (Node h = head, p = h, q;;) {
//儲存當前節點值
E item = p.item;
//當前節點有值則cas變為null(1)
if (item != null && p.casItem(item, null)) {
//cas成功標誌當前節點以及從連結串列中移除
if (p != h) //類似tail間隔2設定一次頭節點(2)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//當前佇列為空則返回null(3)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//自引用了,則重新找新的佇列頭節點(4)
else if (p == q)
continue restartFromHead;
else//(5)
p = q;
}
}
}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
peek操作
peek操作是獲取連結串列頭部一個元素(只讀取不移除),下面看看實現原理。程式碼與poll類似,只是少了castItem.並且peek操作會改變head指向,offer後head指向哨兵節點,第一次peek後head會指向第一個真的節點元素。
public E peek() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
size操作獲取當前佇列元素個數,在併發環境下不是很有用,因為使用CAS沒有加鎖所以從呼叫size函式到返回結果期間有可能增刪元素,導致統計的元素個數不精確。
public int size() {
int count = 0;
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
//最大返回Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}
//獲取第一個佇列元素(哨兵元素不算),沒有則為null
Node first() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//獲取當前節點的next元素,如果是自引入節點則返回真正頭節點
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
remove操作如果佇列裡面存在該元素則刪除給元素,如果存在多個則刪除第一個,並返回true,否者返回false
public boolean remove(Object o) {
//查詢元素為空,直接返回false
if (o == null) return false;
Node pred = null;
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
//相等則使用cas值null,同時一個執行緒成功,失敗的執行緒迴圈查詢佇列中其他元素是否有匹配的。
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
//獲取next元素
Node next = succ(p);
//如果有前驅節點,並且next不為空則連結前驅節點到next,
if (pred != null && next != null)
pred.casNext(p, next);
return true;
}
pred = p;
}
return false;
}
contains操作判斷佇列裡面是否含有指定物件,由於是遍歷整個佇列,所以類似size 不是那麼精確,有可能呼叫該方法時候元素還在佇列裡面,但是遍歷過程中才把該元素刪除了,那麼就會返回false.
public boolean contains(Object o) {
if (o == null) return false;
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}
ConcurrentLinkedQuere的offer方法有意思的問題
offer中有個 判斷 t != (t = tail)假如 t=node1;tail=node2;並且node1!=node2那麼這個判斷是true還是false那,答案是true,這個判斷是看當前t是不是和tail相等,相等則返回true否者為false,但是無論結果是啥執行後t的值都是tail。
下面從位元組碼來分析下為啥。
一個例子
public static void main(String[] args) {
int t = 2;
int tail = 3;
System.out.println(t != (t = tail));
}
結果為:true
位元組碼檔案
C:\Users\Simple\Desktop\TeacherCode\Crm_Test\build\classes\com\itheima\crm\util>javap -c Test001
警告:二進位制檔案Test001包含com.itheima.crm.util.Test001
Compiled from "Test001.java"
public class com.itheima.crm.util.Test001 {
public com.itheima.crm.util.Test001();
Code:
0: aload_0
1: invokespecial #8 // Method java/lang/Object."":()V
4: return
public static void main(java.lang.String[]);
Code:
0: iconst_2
1: istore_1
2: iconst_3
3: istore_2
4: getstatic #16 // Field java/lang/System.out:Ljava/io/PrintStream;
7: iload_1
8: iload_2
9: dup
10: istore_1
11: if_icmpeq 18
14: iconst_1
15: goto 19
18: iconst_0
19: invokevirtual #22 // Method java/io/PrintStream.println:(Z)V
22: return
}
我們從上面標黃的位元組碼檔案中分析
一開始棧為空:
第0行指令作用是把值2入棧棧頂元素為2
第1行指令作用是將棧頂int型別值儲存到區域性變數t中
第2行指令作用是把值3入棧棧頂元素為3第3行指令作用是將棧頂int型別值儲存到區域性變數tail中。
第4呼叫列印命令第7行指令作用是把變數t中的值入棧
第8行指令作用是把變數tail中的值入棧現在棧裡面的元素為3、2,並且3位於棧頂
第9行指令作用是當前棧頂元素入棧,所以現在棧內容3,3,2
第10行指令作用是把棧頂元素存放到t,現在棧內容3,2第11行指令作用是判斷棧頂兩個元素值,相等則跳轉 18。由於現在棧頂嚴肅為3,2不相等所以返回true.
第14行指令作用是把1入棧
然後回頭分析下!=是雙目運算子,應該是首先把左邊的運算元入棧,然後在去計算了右側運算元。
ConcurrentLinkedQuere總結
ConcurrentLinkedQueue使用CAS非阻塞演算法實現使用CAS解決了當前節點與next節點之間的安全連結和對當前節點值的賦值。由於使用CAS沒有使用鎖,所以獲取size的時候有可能進行offer,poll或者remove操作,導致獲取的元素個數不精確,所以在併發情況下size函式不是很有用。另外第一次peek或者first時候會把head指向第一個真正的佇列元素。
下面總結下如何實現執行緒安全的,可知入隊出隊函式都是操作volatile變數:head,tail。所以要保證佇列執行緒安全只需要保證對這兩個Node操作的可見性和原子性,由於volatile本身保證可見性,所以只需要看下多執行緒下如果保證對著兩個變數操作的原子性。
對於offer操作是在tail後面新增元素,也就是呼叫tail.casNext方法,而這個方法是使用的CAS操作,只有一個執行緒會成功,然後失敗的執行緒會迴圈一下,重新獲取tail,然後執行casNext方法。對於poll也是這樣的。
ConcurrentHashMap非阻塞Hash集合
ConcurrentHashMap是Java併發包中提供的一個執行緒安全且高效的HashMap實現,ConcurrentHashMap在併發程式設計的場景中使用頻率非常之高,本文就來分析下ConcurrentHashMap的實現原理,並對其實現原理進行分析。
ConcurrentLinkedQuere類圖
ConcurrentHashMap是由Segment陣列結構和HashEntry陣列結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裡扮演鎖的角色,HashEntry則用於儲存鍵值對資料。一個ConcurrentHashMap裡包含一個Segment陣列,Segment的結構和HashMap類似,是一種陣列和連結串列結構, 一個Segment裡包含一個HashEntry陣列,每個HashEntry是一個連結串列結構的元素, 每個Segment守護者一個HashEntry數組裡的元素,當對HashEntry陣列的資料進行修改時,必須首先獲得它對應的Segment鎖。
ConcurrentLinkedQuere實現原理
眾所周知,雜湊表是中非常高效,複雜度為O(1)的資料結構,在Java開發中,我們最常見到最頻繁使用的就是HashMap和HashTable,但是線上程競爭激烈的併發場景中使用都不夠合理。
HashMap :先說HashMap,HashMap是執行緒不安全的,在併發環境下,可能會形成環狀連結串列(擴容時可能造成,具體原因自行百度google或檢視原始碼分析),導致get操作時,cpu空轉,所以,在併發環境中使用HashMap是非常危險的。
HashTable : HashTable和HashMap的實現原理幾乎一樣,差別無非是1.HashTable不允許key和value為null;2.HashTable是執行緒安全的。但是HashTable執行緒安全的策略實現代價卻太大了,簡單粗暴,get/put所有相關操作都是synchronized的,這相當於給整個雜湊表加了一把大鎖,多執行緒訪問時候,只要有一個執行緒訪問或操作該物件,那其他