1. 程式人生 > >實現一個獨佔鎖和3元共享鎖及其思路

實現一個獨佔鎖和3元共享鎖及其思路

public class SelfLock implements Lock{
    
    //state 表示獲取到鎖 state=1 獲取到了鎖,state=0,表示這個鎖當前沒有執行緒拿到
    private static class Sync extends AbstractQueuedSynchronizer{
        
        //是否佔用
        protected boolean isHeldExclusively() {
            return getState()==1;
        }
        
        protected
boolean tryAcquire(int arg) { if(compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int arg) { if(getState()==0) {
throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } Condition newCondition() { return new ConditionObject(); } } private final Sync sycn = new Sync(); @Override
public void lock() { sycn.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sycn.acquireInterruptibly(1); } @Override public boolean tryLock() { return sycn.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sycn.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sycn.release(1); } @Override public Condition newCondition() { return sycn.newCondition(); } }
public class Test {
    private static SelfLock sl = new SelfLock();

    private static int a = 0;
    private static Condition con = sl.newCondition();

    public static void increment() {
        sl.lock();
        a++;
        sl.unlock();
    }

    private static CyclicBarrier cb = new CyclicBarrier(31);// 設定一個同步屏障,等30個執行緒+主執行緒都執行完後再列印最終a的值

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    for (int j = 0; j < 100000; j++) {
                        increment();
                    }
                    try {
                        cb.await();
                        //System.out.println(Thread.currentThread().getId() + " wo hao le");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }).start();

        }
        cb.await();
        System.out.println(a);
    }
}

public class TrinityLock   {

    //為3表示允許兩個執行緒同時獲得鎖
    private final Sync sync = new Sync(3);

    private static final class Sync extends AbstractQueuedSynchronizer {
        //private static final long serialVersionUID = -7889272986162341211L;

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);//設定3個許可證,最多允許3個共享執行緒
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    public Condition newCondition() {
        return sync.newCondition();
    }
}
三元共享鎖

 

 

實現思路:一個鎖的基本行為正規化都定義在Lock介面中,我們只要實現Lock介面即可。lock介面中主要的lock和unlock方法的實現AQS已經幫我們實現大部分了,我們只要實現對state共享資源的細化操作就可以實現不同的鎖也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared方法。

那麼為啥我們只要實現上述4個方法就能實現不同的鎖?

    比如:
    public void lock() {
        sycn.acquire(1);
        
    }

它的實現依賴於AQS中acquire方法 如下面程式碼

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
acquireQueued

 

這裡的tryAcquire就是我們重寫的方法,我們重寫的方法假如state為0,就將state置為1,並將當前執行緒設為獨佔執行緒,

假如state為1,返回false,然後addWaiter將當前執行緒置入同步佇列的尾部,acquireQueued是自旋中,假如head節點指向了當前執行緒Node,就tryAcquire,返回interrupted值,然後根據條件值執行selfInterrupt阻塞當前執行緒。

總結:AQS為我們定義好了頂層的處理實現邏輯,我們在使用AQS構建符合我們需求的同步元件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取即可,至於背後複雜的執行緒排隊,執行緒阻塞/喚醒,如何保證執行緒安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用。AQS定義好頂級邏輯的骨架,並提取出公用的執行緒入佇列/出佇列,阻塞/喚醒等一系列複雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現。