1. 程式人生 > 其它 >java併發包JUC學習

java併發包JUC學習

介面類Lock.java提供介面 方法
//加鎖實現方法
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//解鎖實現方法
    void unlock();
//對鎖新增條件
    Condition newCondition();


介面類 Condition.java提供介面方法
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

介面類 ReadWriteLock.java提供介面方法
    Lock readLock();
    Lock writeLock();

Reentrantlock.java研究,先寫一個測試用例
package JUC;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 重入鎖測試用例
 */

public class ReentrantLockDemo {
    //定義鎖為公平鎖
    static ReentrantLock fairlock = new ReentrantLock(true);
    //定義鎖為非公平鎖
    static ReentrantLock nofairlock = new ReentrantLock();
    public static void main(String[] args) {
        //非公平鎖搶佔鎖
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                nofairlock.lock();
                System.out.println("nofairThreadName="+Thread.currentThread().getName());
                nofairlock.unlock();
            },"name-"+i).start();
        }
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //公平鎖順序獲取執行緒
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                fairlock.lock();
                System.out.println("fairThreadName="+Thread.currentThread().getName());
                fairlock.unlock();
            },"name-"+i).start();
        }
    }
}

執行結果如下圖:在非公平鎖的情況下,執行緒7、執行緒9 搶佔了鎖資源,插隊執行了。公平鎖是安裝獲取鎖的順序執行程式,無搶佔情況發生。

nofairThreadName=name-0
nofairThreadName=name-7
nofairThreadName=name-1
nofairThreadName=name-2
nofairThreadName=name-9
nofairThreadName=name-3
nofairThreadName=name-4
nofairThreadName=name-5
nofairThreadName=name-6
nofairThreadName=name-8
fairThreadName=name-0
fairThreadName=name-1
fairThreadName=name-2
fairThreadName=name-3
fairThreadName=name-4
fairThreadName=name-5
fairThreadName=name-6
fairThreadName=name-7
fairThreadName=name-8
fairThreadName=name-9
加鎖邏輯lock()

檢視 Reentrantlock.java原始碼發現

//提供兩個內部類
//非公平鎖
class NonfairSync extends Sync{
    final void lock() {
        	//嘗試獲取鎖,如果獲取鎖成功直接執行
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //獲取鎖失敗  新增到CLH佇列等待執行
                acquire(1);
        }
}
//公平鎖
class FairSync extends Sync{
    final void lock() {
        //執行新增到CLH佇列等待執行
            acquire(1);
    }
}
    

AQS實現原理,先看懂CLH佇列鎖機制

//AQS佇列獲取鎖  入口類  
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**分三步走 在Reentrantlock實現中 arg=1 
*第一步:tryAcquire(arg)
*第二步:addWaiter(Node.EXCLUSIVE)
*第三步:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
**/
//第一步:非公平鎖tryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //獲取鎖的狀態
    int c = getState();
    if (c == 0) {//0代表可以獲取鎖
        //再次搶佔鎖資源
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        //如果是當前執行緒重複獲取鎖資源,State狀態+acquires
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//第一步:公平鎖tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //首先判斷是不是頭結點,如果不是頭結點不參與搶佔鎖資源
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//第二步:新增執行緒到等待CLH佇列進行自旋
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    //如果 存在等待佇列  直接在尾部新增新節點
    if (pred != null) {
        node.prev = pred;
        //原子性賦值操作等價於tail = node;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果不存在等待佇列  新建立一下CLH佇列
    enq(node);
    return node;
}
//第三步:acquireQueued(node, arg)
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //lock操作  阻塞在這裡等待鎖資源釋放
            for (;;) {
                //自旋嘗試獲取鎖
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //獲取鎖之後  驗證是否執行緒中斷,如果執行緒中斷設定當前執行緒interrupted = true;
                    return interrupted;
                }
                //通過 LockSupport.park(this); 實現interrupted處理
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
釋放鎖邏輯unlock()

//釋放鎖 不存在併發操作,可以忽略搶佔資源的情況

AQS提供邏輯實現

//arg  = 1
public final boolean release(int arg) {
    //釋放資源
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //設定waitStatus的狀態=0,傳送訊號LockSupport.unpark(s.thread)給鏈條的其他結點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

reentrantlock.java實現

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //釋放資源成功  設定狀態state=0
    setState(c);
    return free;
}
LockSupport.java應用
package JUC;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * LockSupport類似於Object的等待,不同步wait的地方在於實現原理不同,LockSupport.unpark呼叫會當前執行緒的內建物件中存在一個計數器,計數器+1
 * LockSupport.park判斷執行緒中的計數器如果大於1代表可以執行,並且把計數器重置為0
 * 下面是對應的Object物件的方法
 * LockSupport.park  ==  Object.wait
 * LockSupport.unpark == Object.notifyAll
 */
public class LockSuportDemo {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            //有效 資料
//            LockSupport.unpark(Thread.currentThread());
//            LockSupport.unpark(Thread.currentThread());
            LockSupport.park(Thread.currentThread());
            System.out.println("hello");
            LockSupport.parkUntil(new String("layman"),System.currentTimeMillis()+1000L);
            System.out.println("layman");
        }, "Thread");
        //無效資料  必須先啟動執行緒,unpark操作是線上程獨享記憶體初始化一個計數器 並且計數器+1
//        LockSupport.unpark(thread);
        thread.start();
        //有效資料
        LockSupport.unpark(thread);
//        try {
//            TimeUnit.SECONDS.sleep(1);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        LockSupport.unpark(thread);
    }
}

ReentrantReadWriteLock.java應用,寫測試用例
package JUC;

import javax.xml.crypto.Data;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.Lock;

/**
 * 讀寫鎖當存在寫操作時會阻塞讀取的操作,等待寫入成功後才可以讀取資料
 */
public class ReentrantReadWriteLockDemo {

    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();
   //讀取資料  採用讀鎖
    public Data get(String key) {
        r.lock();
        try {
            return m.get(key);
        } finally {
            r.unlock();
        }
    }
    //寫入資料  採用寫鎖
    public Data put(String key, Data value) {
        w.lock();
        try {
            try {
                //寫入的時候執行緒睡眠10秒鐘
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return m.put(key, value);
        } finally {
            w.unlock();
        }
    }
    public static void main(String[] args) {
        ReentrantReadWriteLockDemo reentrantReadWriteLockDemo = new ReentrantReadWriteLockDemo();
        new Thread(()->{
            reentrantReadWriteLockDemo.put("layman", new Data() {
                String name = "Tom";
            });
        }).start();
        new Thread(()->{
            for (;;){
                Data layman = reentrantReadWriteLockDemo.get("layman");
                Field name = null;
                try {
                    name = layman.getClass().getDeclaredField("name");
                    Object o = name.get(layman);
                    System.out.println(o);
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}