1. 程式人生 > >Java併發、同步總結

Java併發、同步總結

Java中提供併發控制的兩種方式:1、同步關鍵字 2、鎖

Java 5.0之前使用的是同步關鍵字Synchronized和volatile,他們是jvm中的隱式鎖
Synchronized和volatile的實現是基於jvm指令的同步程式碼塊實現的。
新增同步關鍵字後,會在jvm程式碼塊指令前後新增monitorexit和monitorenter兩個同步指令。
但是Synchronized修飾的方法卻不一樣,同步方法的實現是JVM定義了方法的訪問標誌ACC_SYNCHRONIZED在方法表中,JVM將同步方法前面設定這個標誌。

同步程式碼塊和同步方法的位元組碼指令(javap -c classname)

 public void syncBlockImpl();
    Code:
       0: aload_0
       1: dup
       2: astore_1
       3: monitorenter
       4: getstatic     #15                 // Field java/lang/System.out:Ljava/io/PrintStream;
       7: ldc           #23                 // String hello world
       9: invokevirtual #22                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
12: aload_1 13: monitorexit 14: goto 22 17: astore_2 18: aload_1 19: monitorexit 20: aload_2 21: athrow 22: return Exception table: from to target type 4 14 17 any 17 20 17 any public
synchronized void syncMethodImpl(); Code: 0: getstatic #15 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #23 // String hello world 5: invokevirtual #22 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return

Java 5.0之後,JDK添加了多執行緒併發庫java.util.concurrent,提供了執行緒池(Executors)、訊號量(Semaphore)、重入鎖(ReentrantLock)、讀寫鎖(ReentrantWriteReadLock)和一些執行緒安全的集合類(ConcurrentHashMap/ConcurrentLinkedQueue)、CountDownLatch

無論是Semaphore還是ReentrantLock都是基於AbstractQueueSynchronized實現的,AQS實現了自己的演算法來實現共享資源的合理控制。AQS中核心的變數替換方法(原子性)是藉助CPU硬體指令集compareAndSweep實現的,等待佇列是由虛擬無界雙向連結串列CLH實現的。每一個等待執行緒對一個Node,Node中儲存了當前執行緒引用、等待狀態、前一個或後一個等待節點。

CLH鎖,是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖。

鎖分為公平鎖和協商鎖(非公平鎖),其中公平鎖的開銷要大。

一、訊號量Semaphore的用法

ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(5, false);//非公平訊號量
//開啟20個執行緒
for (int i = 0; i < 20; i++) {
final int Num = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
//執行緒開始時,獲取許可
semaphore.acquire();
System.out.println(“Thread ” + Num + ” accessing”);
Thread.sleep((long) (Math.random() * 10000));
//訪問完後釋放
semaphore.release();
System.out.println(“available permit = ” + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}

    }
};

executorService.submit(runnable);
}

Semaphore本質為共享鎖,用來限制多個執行緒對有限資源訪問進行控制。

二、ReentrantLock 可重入鎖 獨佔鎖

比Synchronized要高效,因為在高度爭用情況下,處理器把大部分時間都花在任務處理上,而不是執行緒排程上
特性:時間等候鎖,可中斷鎖,多個條件變數,鎖投票,無塊結構鎖
一般用法:
try {
reentrantLock.lock();
System.out.println(str + ” 獲取鎖 “);
doSomeThing(str);
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(str + ” 釋放鎖 “);
reentrantLock.unlock();
}

時間等候鎖,在一定時間內獲取不到鎖,則返回false

boolean captured = false;
try {
//在2秒內嘗試獲取鎖
captured = reentrantLock.tryLock(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(“tryLock(10, TimeUnit.SECONDS) ” + captured);
} finally {
if (captured) {
reentrantLock.unlock();
}
}

可中斷鎖,獲取鎖的執行緒可被自己或其他執行緒中斷

try {
reentrantLock.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + ” 獲取鎖 “);
try {
Thread.sleep(5 * 1000);
reentrantLockTest.plusMethod();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + “被中斷 “);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
System.out.println(Thread.currentThread().getName() + ” 釋放鎖 “);
}

多條件變/
* 生產者-消費者
* 多條件鎖
*


* Created by zczhang on 16/9/27.
*/
public class ProductQueue {
private T[] items;
private final Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

private int head, tail, count;

public ProductQueue() {
    this(10);
}

public ProductQueue(int maxSize) {
    items = (T[]) new Object[maxSize];
}

public void put(T t) throws InterruptedException {
    lock.lock();
    try {
        while (count == getCapacity()) {
            System.out.println("---------佇列滿,生產者等待------");
            notFull.await();
        }
        items[tail] = t;
        if (++tail == getCapacity()) {
            tail = 0;
        }
        ++count;
        notEmpty.signalAll();
    } finally {
        lock.unlock();
    }
}

public T take() throws InterruptedException {
    lock.lock();
    try {
        while (count == 0) {
            System.out.println("---------佇列空,消費者" + Thread.currentThread().getName() + "等待------");
            notEmpty.await();
        }
        T item = items[head];
        items[head] = null;
        if (++head == getCapacity()) {
            head = 0;
        }
        --count;
        notFull.signalAll();
        return item;
    } finally {
        lock.unlock();
    }
}

public int size() {
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

public int getCapacity() {
    return items.length;
}

}eentrantReadWriteLock讀寫鎖

讀鎖為共享鎖,寫鎖為獨佔鎖。讀讀不互斥,讀寫互斥,寫寫互斥
寫鎖支援Condition,讀鎖不支援Condition。
支援鎖降級,即寫鎖可以獲取讀鎖,然後釋放寫鎖,這樣寫鎖就變成了讀鎖
不支援鎖升級,即讀鎖不能直接獲取寫鎖。需將讀鎖釋放後再獲取寫鎖。
支援鎖中斷
讀寫鎖的最大數量只能是65535(包括重入數)

private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
private int resource = 0;

public int read() {
    try {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        readLock.lock();
        System.out.println(Thread.currentThread().getName() + "獲取讀鎖,嘗試讀取");
        return resource;
    } finally {
        readLock.unlock();
        System.out.println(Thread.currentThread().getName() + "釋放讀鎖");
    }
}

public void write(int newValue) {
    try {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        writeLock.lock();
        System.out.println(Thread.currentThread().getName() + "獲取寫鎖,嘗試寫入");
        this.resource = newValue;
        System.out.println(Thread.currentThread().getName() + "寫入值 " + newValue);
    } finally {
        writeLock.unlock();
        System.out.println(Thread.currentThread().getName() + "釋放寫鎖");
    }
}

四、CountDownLatch

/**
 * CountDownLatch 同步輔助類
 * 指定數字count初始化,然後呼叫await方法的執行緒會一直阻塞,直到count變為0
 * <p>
 * 一個同步輔助類,允許一個或多個執行緒等待,一直到其他執行緒完成任務
 * 使用場景:1.主執行緒開啟多個子執行緒去執行分解的任務,當所有子執行緒都完成後,主執行緒再繼續執行。
 * 2.主執行緒先執行,子執行緒再執行,主執行緒等待子執行緒完成再執行
 * <p>
 * Created by zczhang on 16/9/28.
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        int childTaskNum = 3;
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch childEndLatch = new CountDownLatch(childTaskNum);

        for (int i = 0; i < childTaskNum; i++) {
            Thread child = new Thread(new ChildTaskRun(startLatch,childEndLatch));
            child.start();
        }

        System.out.println("主執行緒處理一些任務...");
        Thread.sleep(2*1000);
        System.out.println("主執行緒處理一些任務完成");

        startLatch.countDown();

        System.out.println("主執行緒等待子執行緒處理任務完成...");
        childEndLatch.await();
        System.out.println("主執行緒處理任務結果");
    }

    private static class ChildTaskRun implements Runnable {
        private CountDownLatch mainStartLatch;
        private CountDownLatch childEndLatch;

        public ChildTaskRun(CountDownLatch mainStartLatch, CountDownLatch childEndLatch) {
            this.mainStartLatch = mainStartLatch;
            this.childEndLatch = childEndLatch;
        }

        @Override
        public void run() {
            try {
                mainStartLatch.await();
                doSomeThing();
                System.out.println("子執行緒 " + Thread.currentThread().getName() + "處理任務完成");
                childEndLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void doSomeThing() {
            System.out.println("子執行緒 " + Thread.currentThread().getName() + "處理任務中");
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

五、CyclicBarrier

/**
 * 迴環柵欄 同步輔助類
 * 可實現讓一組執行緒等待至某個狀態之後再全部同時執行,這個狀態就叫做一個柵欄.
 * 當所有執行緒都被釋放後,該柵欄可以重用
 * <p/>
 * 使用場景:開啟多個子執行緒處理同步任務,當所有子任務都處理完成後,各個子執行緒再處理其他任務.
 * 同時主執行緒可插入任務,當最後一個子執行緒完成同步任務後,執行主執行緒插入任務,然後子執行緒再處理其他任務
 * <p/>
 * Created by zczhang on 16/9/28.
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        int childTaskNum = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(childTaskNum, new Runnable() {
            @Override
            public void run() {
                System.out.println("當最後一個執行緒 " + Thread.currentThread().getName() + "完成同步任務時,做一些事情...");
            }
        });
        for (int i = 0; i < childTaskNum; i++) {
            Thread thread = new Thread(new ChildTaskRun(cyclicBarrier));
            thread.start();
        }

    }

    public static class ChildTaskRun implements Runnable {

        private CyclicBarrier cyclicBarrier;

        public ChildTaskRun(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("子執行緒" + Thread.currentThread().getName() + "開始處理任務...");
            try {
                Thread.sleep(3 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子執行緒" + Thread.currentThread().getName() + "處理任務完成,等待其他執行緒...");

            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println("所有子執行緒完成同步任務, 繼續執行其他任務...");

        }
    }
}