1. 程式人生 > >JDK容器與併發—Queue—PriorityBlockingQueue

JDK容器與併發—Queue—PriorityBlockingQueue

概述

      基於優先堆的無界阻塞佇列,PriorityQueue的執行緒安全版本。

資料結構

      基於陣列的平衡二叉堆,在PriorityQueue基礎上,增加了一把鎖、一個條件:

private transient Object[] queue;

// 增刪查公用的鎖
private final ReentrantLock lock;

// 佇列為空時,阻塞take/poll執行緒的條件
private final Condition notEmpty;

 // 自旋鎖,用CAS方式獲取,用於動態擴充套件queue
private transient volatile int allocationSpinLock;

構造器

      與PriorityQueue幾乎一樣,除了需要對lock、notEmpty初始化:

public PriorityBlockingQueue() {
	this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
	this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
							 Comparator<? super E> comparator) {
	if (initialCapacity < 1)
		throw new IllegalArgumentException();
	this.lock = new ReentrantLock();		// lock、notEmpty初始化
	this.notEmpty = lock.newCondition();
	this.comparator = comparator;
	this.queue = new Object[initialCapacity];
}

public PriorityBlockingQueue(Collection<? extends E> c) {
	this.lock = new ReentrantLock();
	this.notEmpty = lock.newCondition();
	boolean heapify = true; // true if not known to be in heap order
	boolean screen = true;  // true if must screen for nulls
	if (c instanceof SortedSet<?>) {
		SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
		this.comparator = (Comparator<? super E>) ss.comparator();
		heapify = false;
	}
	else if (c instanceof PriorityBlockingQueue<?>) {
		PriorityBlockingQueue<? extends E> pq =
			(PriorityBlockingQueue<? extends E>) c;
		this.comparator = (Comparator<? super E>) pq.comparator();
		screen = false;
		if (pq.getClass() == PriorityBlockingQueue.class) // exact match
			heapify = false;
	}
	Object[] a = c.toArray();
	int n = a.length;
	// If c.toArray incorrectly doesn't return Object[], copy it.
	if (a.getClass() != Object[].class)
		a = Arrays.copyOf(a, n, Object[].class);
	if (screen && (n == 1 || this.comparator != null)) {
		for (int i = 0; i < n; ++i)
			if (a[i] == null)
				throw new NullPointerException();
	}
	this.queue = a;
	this.size = n;
	if (heapify)
		heapify();
}

增刪查

容器調整策略(避免無限制擴充套件)

      步驟(與PriorityQueue大致相同,除了採用自旋鎖的方式動態分配陣列,在獲取公用鎖下複製queue):
1)當queue已滿,若有元素入隊請求,則進行容量擴充套件;
2)在獲取公用鎖lock的前提下,釋放lock,採用自旋鎖的方式動態擴充套件陣列,允許與take/poll執行緒併發,完成分配後再重新獲取lock;
3)oldCap小於64則容量翻倍;否則增長50%;
4)檢查newCap是否在MAX_ARRAY_SIZE範圍內,若minCap有overflow或大於MAX_ARRAY_SIZE,丟擲OutOfMemoryError異常;否則容量最大不超過MAX_ARRAY_SIZE;
5)動態分配新容量的Object[];
6)獲取公用鎖,將舊queue中的元素複製過來。

while ((n = size) >= (cap = (array = queue).length)) // while使用是為了採用自旋鎖進行擴充套件queue
		tryGrow(array, cap);

// 動態擴充套件queue		
// 釋放公用鎖,採用自旋鎖,允許擴充套件過程中與take/poll執行緒併發,避免其在該過程中的等待
private void tryGrow(Object[] array, int oldCap) {
	lock.unlock(); 				// 釋放公用鎖
	Object[] newArray = null;
	if (allocationSpinLock == 0 &&				// 採用CAS方式獲取allocationSpinLock,進行動態分配
		UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
								 0, 1)) {
		try {
			int newCap = oldCap + ((oldCap < 64) ?
								   (oldCap + 2) : // grow faster if small
								   (oldCap >> 1));
			if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
				int minCap = oldCap + 1;
				if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
					throw new OutOfMemoryError();
				newCap = MAX_ARRAY_SIZE;
			}
			if (newCap > oldCap && queue == array)
				newArray = new Object[newCap];
		} finally {
			allocationSpinLock = 0;
		}
	}
	if (newArray == null) // 其他執行緒正進行擴充套件,當前執行緒yield
		Thread.yield();
	lock.lock();			    // 重新獲取公用鎖
	if (newArray != null && queue == array) {      // 複製佇列
		queue = newArray;
		System.arraycopy(array, 0, newArray, 0, oldCap);
	}
}

基礎方法

      與PriorityQueue一樣,除了增加兩個引數:Object[] array、Comparator<? super T> cmp,以保證併發性:

// 對元素x,從k往前移,保持二叉堆的平衡性
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
								   Comparator<? super T> cmp) {
	while (k > 0) {
		int parent = (k - 1) >>> 1;
		Object e = array[parent];
		if (cmp.compare(x, (T) e) >= 0)
			break;
		array[k] = e;
		k = parent;
	}
	array[k] = x;
}

// 對元素x,從k往後移,保持二叉堆的平衡性
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
												int n,
												Comparator<? super T> cmp) {
	if (n > 0) {
		int half = n >>> 1;
		while (k < half) {
			int child = (k << 1) + 1;
			Object c = array[child];
			int right = child + 1;
			if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
				c = array[child = right];
			if (cmp.compare(x, (T) c) <= 0)
				break;
			array[k] = c;
			k = child;
		}
		array[k] = x;
	}
}

      步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否已滿,若滿則採用自旋鎖的方式進行容量擴充套件;
3)從隊尾,對元素進行siftUp,保持二叉堆的平衡性;
4)向take/poll執行緒傳送notEmpty訊號;
5)釋放鎖lock;
6)返回true。

另外,由於PriorityBlockingQueue無界,add、put操作都直接委託給offer進行:

public boolean offer(E e) {
	if (e == null)
		throw new NullPointerException();
	final ReentrantLock lock = this.lock;
	lock.lock();
	int n, cap;
	Object[] array;
	while ((n = size) >= (cap = (array = queue).length))
		tryGrow(array, cap);
	try {
		Comparator<? super E> cmp = comparator;
		if (cmp == null)
			siftUpComparable(n, e, array);
		else
			siftUpUsingComparator(n, e, array, cmp);
		size = n + 1;
		notEmpty.signal();
	} finally {
		lock.unlock();
	}
	return true;
}

步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否為空,為空則返回null;
3)取出佇列最後一個元素,從索引0開始,對其進行siftDown,保持二叉堆的平衡性;
4)釋放鎖lock;
5)返回隊首元素值。

public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return dequeue();
	} finally {
		lock.unlock();
	}
}

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	E result;
	try {
		while ( (result = dequeue()) == null)
			notEmpty.await();
	} finally {
		lock.unlock();
	}
	return result;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	long nanos = unit.toNanos(timeout);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	E result;
	try {
		while ( (result = dequeue()) == null && nanos > 0)
			nanos = notEmpty.awaitNanos(nanos);
	} finally {
		lock.unlock();
	}
	return result;
}

private E dequeue() {
	int n = size - 1;
	if (n < 0)
		return null;
	else {
		Object[] array = queue;
		E result = (E) array[0];
		E x = (E) array[n];
		array[n] = null;
		Comparator<? super E> cmp = comparator;
		if (cmp == null)
			siftDownComparable(0, x, array, n);
		else
			siftDownUsingComparator(0, x, array, n, cmp);
		size = n;
		return result;
	}
}

      步驟(過程與PriorityQueue一樣,除了考慮併發性加鎖):
1)獲取公用鎖lock;
2)檢查佇列是否為空,為空則返回null;
3)釋放鎖lock;
4)返回隊首元素值。

public E peek() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return (size == 0) ? null : (E) queue[0];
	} finally {
		lock.unlock();
	}
}

迭代器

      不保證元素的迭代順序,基於底層陣列的副本實現:

public Iterator<E> iterator() {
	return new Itr(toArray());
}

final class Itr implements Iterator<E> {
	final Object[] array; // Array of all elements
	int cursor;           // index of next element to return
	int lastRet;          // index of last element, or -1 if no such

	Itr(Object[] array) {
		lastRet = -1;
		this.array = array;
	}

	public boolean hasNext() {
		return cursor < array.length;
	}

	public E next() {
		if (cursor >= array.length)
			throw new NoSuchElementException();
		lastRet = cursor;
		return (E)array[cursor++];
	}

	public void remove() {
		if (lastRet < 0)
			throw new IllegalStateException();
		removeEQ(array[lastRet]);
		lastRet = -1;
	}
}

特性

PriorityBlockingQueue中優先順序相同的元素處理

      若多個元素的優先順序相同,則其順序是不固定的,可以採用二級比較方法來進一步排序,以下示例為按照元素的入隊順序進行二級比較:

class FIFOEntry<E extends Comparable<? super E>>
		implements Comparable<FIFOEntry<E>> {
	static final AtomicLong seq = new AtomicLong(0);
	final long seqNum;
	final E entry;
	public FIFOEntry(E entry) {
		seqNum = seq.getAndIncrement();
		this.entry = entry;
	}
	public E getEntry() { return entry; }
	public int compareTo(FIFOEntry<E> other) {
		int res = entry.compareTo(other.entry);
		if (res == 0 && other.entry != this.entry)
			res = (seqNum < other.seqNum ? -1 : 1);
		return res;
	}
}

為什麼PriorityBlockingQueue的操作不直接在委託給PriorityQueue基礎上加鎖實現?

      allocationSpinLock在動態擴充套件queue上的使用使得委託+lock是實現不了的。

PriorityBlockingQueue就是PriorityQueue的加鎖執行緒安全版。