Java高併發(三)——多執行緒協作,同步控制
繼上一篇:Java高併發——多執行緒基礎 中講到,共享資源的合理使用,才能夠使多執行緒程式有條不紊的執行。其中我們通過synchronized來實現臨界區資源的是否可以訪問。而,這篇我們來重點總結synchronized的增強替代版鎖,以及其它JDK併發包提供的一些同步控制的功能。
好,還是先看下知識的總結思維導圖,然後分開進行總結:
一,ReentrantLock(重入鎖):1,顧名思義就是像一把鎖,我們可以鎖住,又可以開啟,從而控制資源的同步訪問。而其中重入特性指的是同一個執行緒,可以反覆的進入;2,中斷響應,對於synchronized只有保持等待,和繼續執行兩種情況;而ReentrantLock在等待的過程,我們可以通知其放棄等待(類似生活中約會,你等了一會朋友沒到,但是朋友遇到突發情況不能來了,給你打了電話通知你,你就不等了);3,申請等待時間:就是指定等待時間,在指定時間沒得到,則放棄;4,公平鎖:指定fair為true則進行先到先得,而不是隨機選取。這裡看下ReentrantLock的相關例子:
//1,ReentrantLock例子 public class ReentrantLockTest implements Runnable{ public static ReentrantLock lock =new ReentrantLock(); public static int i =0; public void run() { for (int j = 0; j < 1000; j++) { lock.lock(); //lock.lock(); try { i++; }finally { lock.unlock(); //lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); Thread t1 = new Thread(reentrantLockTest); Thread t2 = new Thread(reentrantLockTest); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } //2,lock1.lockInterruptibly()中斷後可放棄 public class InterruptLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public InterruptLock(int lock) { this.lock = lock; } public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); Thread.sleep(500); lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); Thread.sleep(500); lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getName() + "執行緒退出"); } } public static void main(String[] args) throws InterruptedException { InterruptLock interruptLock1 = new InterruptLock(1); InterruptLock interruptLock2 = new InterruptLock(2); Thread t1 = new Thread(interruptLock1); Thread t2 = new Thread(interruptLock2); t1.start(); t2.start(); Thread.sleep(1000); t2.interrupt(); } } //3,申請等待時間例子:如果直接使用tryLock()如果拿不到則直接返回,不會等待 public class TimeLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { System.out.println( Thread.currentThread().getName() + "get lock success"); Thread.sleep(6000); } else { System.out.println( Thread.currentThread().getName() + "get lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { TimeLock timeLock = new TimeLock(); Thread t1 = new Thread(timeLock); Thread t2 = new Thread(timeLock); t1.start(); t2.start(); } } //4,公平鎖,先到先得 public class FairLock implements Runnable { public static ReentrantLock fairLock = new ReentrantLock(true); public void run() { while (true){ try{ fairLock.lock(); System.out.println(Thread.currentThread().getName() + "get lock"); }finally { fairLock.unlock(); } } } public static void main(String[] args) { FairLock fairLock = new FairLock(); Thread t1 = new Thread(fairLock,"thread1"); Thread t2 = new Thread(fairLock,"thread2"); t1.start(); t2.start(); } }
二,Condition條件:記得上篇部落格中我們wait()、notify()等待和通知,Condition也可以實現類似的功能,配合鎖進行使用。提供的方法await()、awaitUninterruptibly()、awaitNanos(long nanosTimeout)、await(long time,TimeUnit unit)、signal()、signalAll()等,也很容易理解。這裡提一下:生產者消費者模型中,如果庫房慢了,則生產者await;如果庫房沒了,則消費者await;生產者生成一個則signal;消費者消費一個則signal。是不是非常實用。好看個簡單例子:
public class ConditionTest implements Runnable {
public static ReentrantLock reentrantLock = new ReentrantLock();
public static Condition condition = reentrantLock.newCondition();
public void run() {
try {
reentrantLock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionTest conditionTest = new ConditionTest();
Thread t1 = new Thread(conditionTest);
t1.start();
Thread.sleep(2000);
reentrantLock.lock();
condition.signal();
reentrantLock.unlock();
}
}
三,Semaphore訊號量:這個也挺容易理解的。無論是synchronized還是lock都是一次只能一個執行緒獲取資源,而訊號量可以多個同時訪問。其中方法有:acquire()、acquireUninterruptibly()、tryAcquire()、tryAcquire(long timeout,TimeUnit unit)、release()等。看個簡單的例子:
public class SemaphoreTest implements Runnable {
final Semaphore semaphore =new Semaphore(5);
public void run() {
try {
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "done");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemaphoreTest semaphoreTest = new SemaphoreTest();
for (int i = 0; i <20 ; i++) {
executorService.execute(semaphoreTest);
}
}
}
四,ReadWriteLock讀寫鎖:現實業務場景中往往都是讀多寫少,而讀不會帶來資料的不一致性,所以就有了讀寫鎖,讀讀不阻塞、讀寫阻塞、寫寫阻塞,對於讀遠遠大於寫的非常使用。看個簡單例子:
public class ReadWriteLockTest {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = reentrantReadWriteLock.readLock();
private static Lock writeLock = reentrantReadWriteLock.writeLock();
private int value;
public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
final ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
Runnable readRunnale = new Runnable() {
public void run() {
try {
readWriteLockTest.handleRead(readLock);
//readWriteLockTest.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable writeRunnale = new Runnable() {
public void run() {
try {
readWriteLockTest.handleWrite(readLock, new Random().nextInt());
//readWriteLockTest.handleWrite(lock,new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunnale).start();
}
for (int i = 0; i < 2; i++) {
new Thread(writeRunnale).start();
}
}
}
五,CountDownLatch倒計時器:也是非常實用的,可以讓主線等待一組子執行緒執行完畢再進行業務的處理。看個簡單例子:
public class CountDownLatchTest implements Runnable {
static final CountDownLatch countDownLatch = new CountDownLatch(10);
static final CountDownLatchTest countDownLatchTest = new CountDownLatchTest();
public void run() {
try {
Thread.sleep(new Random().nextInt(10)*100);
System.out.println("check complete");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i <10 ; i++) {
executorService.execute(countDownLatchTest);
}
countDownLatch.await();
System.out.println("all complete");
executorService.shutdown();
}
}
六,CyclicBarrier迴圈柵欄:增強版CountDownLatch,但是是可以迴圈使用。這裡看個經典例子:司令下達命令,10個士兵一起去完成一項任務:10個士兵首先集合報道,然後去執行任務,執行完了再彙報司令,司令宣佈完成任務。相當於計數了兩次,迴圈使用了。好看個例子:
public class CyclicBarrierTest {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclicBarrier;
Soldier(CyclicBarrier cyclicBarrier, String soldierName) {
this.soldier = soldierName;
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
//等待所有士兵到齊
cyclicBarrier.await();
doWork();
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
void doWork() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + "do work");
}
}
public static class BarrierRun implements Runnable {
boolean flag;
int n;
public BarrierRun(boolean flag, int n) {
this.flag = flag;
this.n = n;
}
public void run() {
if (flag) {
System.out.println("soldier" + n + "個,do work");
} else {
System.out.println("soldier" + n + "個,集合完畢");
flag = true;
}
}
}
public static void main(String[] args) {
final int n = 10;
Thread[] allSoldier = new Thread[n];
boolean flag = false;
CyclicBarrier cyclicBarrier = new CyclicBarrier(n, new BarrierRun(flag, n));
//
System.out.println("集合");
for (int i = 0; i < n; i++) {
System.out.println(i + "+soldier come");
allSoldier[i] = new Thread(new Soldier(cyclicBarrier, "soldier" + n));
allSoldier[i].start();
}
}
}
七,LockSupport:上篇講了掛起(suspend)和繼續執行(resume),其中都是JDK不建議使用的,也說到它的不好處。而LockSupport的靜態方法park()可以阻塞當前執行緒,unpark()繼續執行,利用的就是訊號量的原理,它為每個執行緒準備了一個許可證,如果許可證可用,則park()立即返回,並消費許可,如果不可用則進行阻塞;而unpark()則使這個許可變為可用。許可為唯一的。
public class LockSupportTest {
public static Object u = new Object();
static ChangeObjectThread t1 =new ChangeObjectThread("t1");
static ChangeObjectThread t2 =new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread{
public ChangeObjectThread(String name){
super.setName(name);
}
@Override
public void run() {
synchronized (u){
System.out.println("in"+ getName());
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
好,通過上邊幾個類的功能都可以很好的滿足多執行緒之間的協作,同步控制資源。只有讓資料不出錯,才能更高的發揮多執行緒的高效價值!繼續……