實現一個獨佔鎖和3元共享鎖及其思路
public class SelfLock implements Lock{ //state 表示獲取到鎖 state=1 獲取到了鎖,state=0,表示這個鎖當前沒有執行緒拿到 private static class Sync extends AbstractQueuedSynchronizer{ //是否佔用 protected boolean isHeldExclusively() { return getState()==1; } protectedboolean tryAcquire(int arg) { if(compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int arg) { if(getState()==0) {throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } Condition newCondition() { return new ConditionObject(); } } private final Sync sycn = new Sync(); @Overridepublic void lock() { sycn.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sycn.acquireInterruptibly(1); } @Override public boolean tryLock() { return sycn.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sycn.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sycn.release(1); } @Override public Condition newCondition() { return sycn.newCondition(); } }
public class Test { private static SelfLock sl = new SelfLock(); private static int a = 0; private static Condition con = sl.newCondition(); public static void increment() { sl.lock(); a++; sl.unlock(); } private static CyclicBarrier cb = new CyclicBarrier(31);// 設定一個同步屏障,等30個執行緒+主執行緒都執行完後再列印最終a的值 public static void main(String[] args) throws InterruptedException, BrokenBarrierException { for (int i = 0; i < 30; i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 100000; j++) { increment(); } try { cb.await(); //System.out.println(Thread.currentThread().getId() + " wo hao le"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } cb.await(); System.out.println(a); } }
public class TrinityLock { //為3表示允許兩個執行緒同時獲得鎖 private final Sync sync = new Sync(3); private static final class Sync extends AbstractQueuedSynchronizer { //private static final long serialVersionUID = -7889272986162341211L; Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count);//設定3個許可證,最多允許3個共享執行緒 } public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } final ConditionObject newCondition() { return new ConditionObject(); } } public void lock() { sync.acquireShared(1); } public void unlock() { sync.releaseShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } public Condition newCondition() { return sync.newCondition(); } }三元共享鎖
實現思路:一個鎖的基本行為正規化都定義在Lock介面中,我們只要實現Lock介面即可。lock介面中主要的lock和unlock方法的實現AQS已經幫我們實現大部分了,我們只要實現對state共享資源的細化操作就可以實現不同的鎖也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared方法。
那麼為啥我們只要實現上述4個方法就能實現不同的鎖?
比如:
public void lock() {
sycn.acquire(1);
}
它的實現依賴於AQS中acquire方法 如下面程式碼
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }acquireQueued
這裡的tryAcquire就是我們重寫的方法,我們重寫的方法假如state為0,就將state置為1,並將當前執行緒設為獨佔執行緒,
假如state為1,返回false,然後addWaiter將當前執行緒置入同步佇列的尾部,acquireQueued是自旋中,假如head節點指向了當前執行緒Node,就tryAcquire,返回interrupted值,然後根據條件值執行selfInterrupt阻塞當前執行緒。
總結:AQS為我們定義好了頂層的處理實現邏輯,我們在使用AQS構建符合我們需求的同步元件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取即可,至於背後複雜的執行緒排隊,執行緒阻塞/喚醒,如何保證執行緒安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用。AQS定義好頂級邏輯的骨架,並提取出公用的執行緒入佇列/出佇列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現。