Python三十個常見的指令碼彙總
阿新 • • 發佈:2021-08-23
目錄
這篇部落格寫的不錯:BlockingQueue
enqueue(Node
unlink(Node
這篇部落格寫的不錯:BlockingQueue
LinkedBlockingQueue書上講的是無界佇列,其實不是特別嚴謹,因為佇列的容量是可以通過有參建構函式設定的,並且無參時,預設是Integer.MAX_VALUE
根據名子Linked,可以知道這是用連結串列實現的佇列
成員變數
/** 佇列容量,預設Integer.MAX_VALUE */ private final int capacity; /** 當前佇列元素數量 */ private final AtomicInteger count = new AtomicInteger(); /** * 連結串列的head */ transient Node<E> head; /** * 連結串列的tail */ private transient Node<E> last; /** 由take, poll方法持有的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 當take的時候,如果佇列為空,則等待 */ private final Condition notEmpty = takeLock.newCondition(); /** 由put, offer方法持有的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 當put的時候,如果佇列滿了,則等待 */ private final Condition notFull = putLock.newCondition();
有一個靜態內部類,定義了連結串列節點:
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
方法
// 返回此佇列中的元素數量。
public int size()
// 返回此佇列在理想情況下(在沒有記憶體或資源約束的情況下)可以不阻塞地接受新元素的數量。它總是等於這個佇列的初始容量減去這個佇列的當前大小。
public int remainingCapacity()
// 將指定的元素插入到此佇列的末尾,如果需要,則等待空間可用。
public void put(E e) throws InterruptedException
// 將指定的元素插入到此佇列的末尾,如有必要,將等待指定的等待時間,直到空間可用為止。超時則返回false。
public boolean offer(E e,
long timeout,
TimeUnit unit)
throws InterruptedException
// 在不超過佇列容量的情況下立即在佇列末尾插入指定的元素,如果成功則返回true,如果佇列已滿則返回false。當使用容量受限的佇列時,此方法通常比add方法更好,後者插入失敗僅丟擲異常。
public boolean offer(E e)
// 檢索並刪除此佇列的頭,如有必要則等待,直到某個元素可用為止。
public E take() throws InterruptedException
// 檢索並刪除此佇列的頭,如有必要,將等待指定的等待時間,直到元素可用。超時返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException
// 檢索並刪除此佇列的頭,如果此佇列為空,則返回null。
public E poll()
// 檢索並刪除此佇列的頭,如果此佇列為空,則返回null。
public E peek()
// 如果指定元素存在,則從此佇列中移除該元素的單個例項。更正式地說,如果佇列中包含一個或多個這樣的元素,則只刪除第一個匹配到的元素。
public boolean remove(Object o)
// 如果此佇列包含至少一個指定的元素,則返回true。
public boolean contains(Object o)
// 返回一個數組,該陣列包含此佇列中的所有元素,按適當的順序排列。返回的陣列將是“安全的”,因為此佇列不維護對它的引用。
public Object[] toArray()
// 返回一個數組,該陣列包含此佇列中的所有元素,按適當的順序排列;返回陣列的執行時型別是指定陣列的執行時型別。
public <T> T[] toArray(T[] a)
// 返回此集合的字串表示形式。
public String toString()
// 刪除此佇列中的所有元素。此呼叫返回後,佇列將為空。
public void clear()
// 從此佇列中刪除所有可用元素並將它們新增到給定集合中。此操作可能比重複輪詢此佇列更有效。在試圖將元素新增到集合c時遇到失敗丟擲相關異常時可能會導致:元素不在原集合或者集合c中,或者兩個集合中都沒有。
public int drainTo(Collection<? super E> c)
// 從該佇列中最多刪除給定數量的可用元素,並將它們新增到給定集合中。異常情況同上
public int drainTo(Collection<? super E> c, int maxElements)
// 按適當的順序返回此佇列中元素的迭代器。元素將按從第一個(head)到最後一個(tail)的順序返回。返回的迭代器是弱一致的。
public Iterator<E> iterator()
// 返回該佇列中元素的Spliterator。返回的spliterator是弱一致的。
public Spliterator<E> spliterator()
方法原始碼閱讀
重點還是關注offer,put,poll,take
enqueue(Node node)
執行將新的節點放入連結串列末尾的操作
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
//這個方法的作用:
//1.將last.next設定為node
//2.將last更新為node,或者或更新last為新的連結串列末尾
//程式碼等效為:
last.next = node;
last = last.next;
}
signalNotEmpty()
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//因為要傳送notEmpty訊號,即允許進行出隊操作
//因此這個時候應該禁止並行的出隊
//所以要對take操作進行加鎖
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer(E e)
新增新元素入隊
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//先拿到當前長度count
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
//將e封裝成連結串列節點node
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//加鎖
putLock.lock();
try {
if (count.get() < capacity) {
//還有容量時,將node放在連結串列的末尾
enqueue(node);
//getAndIncrement返回的還是新增之前的數值
c = count.getAndIncrement();
//所以這裡判斷還是需要進行c + 1
//如果新增之後,還有容量,傳送notFull訊號,說明佇列還能繼續新增
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//解鎖
putLock.unlock();
}
//如果c==0,說明佇列中只有一個節點,即狀態從空變為非空
//因此可以傳送notEmpty訊號
if (c == 0)
signalNotEmpty();
return c >= 0;
}
offer(E e, long timeout, TimeUnit unit)
與offer差不多,就是加了個時間等待,與ArrayBlockingQueue等待的操作一樣
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
put(E e)
進行入隊操作,但是增加了阻塞等待的機制
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
意思是說count在這裡只會減少,因此用來判斷坐while的條件是沒有問題的
*/
while (count.get() == capacity) {
//核心就是在這裡,當佇列滿了之後,會阻塞等待
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
dequeue()
出隊,就是刪除連結串列首部的節點
這裡有一個值得注意的點是,head節點僅僅做標記,實際的隊首是head.next
並且head標記節點,內部的值為null,因此出隊一個後,實際出隊的那個節點,可以將值置為null,然後將head更新為該節點,相當於head是不斷變化的
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
//更新為head為first
head = first;
E x = first.item;
//將first的值置為null,此時first節點就是真正的head節點了
first.item = null;
return x;
}
poll()
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
//進行加鎖
takeLock.lock();
try {
if (count.get() > 0) {
//得到出隊的資料
x = dequeue();
c = count.getAndDecrement();
//注意這裡c還是count之前的樹值,因此判斷c > 1,也就是c >= 2,減1之後c >= 1
//因此可以傳送佇列notEmpty的訊號了
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//如果c之前為capacity,出隊一個後,佇列就有了空間
//所以可以向佇列傳送notFull的訊號了
if (c == capacity)
signalNotFull();
return x;
}
poll(long timeout, TimeUnit unit)
與poll()沒什麼太大區別,僅僅加了一個等待時間
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
take()
與poll()也沒什麼太大區別,就是當佇列沒有節點時,會進行阻塞等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//阻塞等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove(Object o)
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
//對takeLock和putLock進行加鎖
fullyLock();
try {
//以trail當作前繼節點,p是實際用來做判斷的節點
//便於節點的刪除
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//將節點從連結串列中刪除
unlink(p, trail);
return true;
}
}
return false;
} finally {
//對takeLock和putLock進行解鎖
fullyUnlock();
}
}
fullyLock()
刪除節點時,不能進行入隊和出隊操作
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
fullyUnlock()
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
unlink(Node p, Node trail)
將節點從連結串列中刪除
/**
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//先將節點的value置為null
p.item = null;
trail.next = p.next;
//如果p是連結串列最後一個節點,則更新last為trail
if (last == p)
last = trail;
//刪除節點之前佇列的容量滿了,這是刪除了一個節點
//可以傳送notFull的訊號
if (count.getAndDecrement() == capacity)
notFull.signal();
}