java併發-讀寫鎖&Park&Condition介面
阿新 • • 發佈:2018-12-22
ReentrantReadWriteLock
核心
讀狀態取state的高16位,寫狀態取state的低16位,來解決一個state需要標識read和write的狀態。
寫Lock 排他、獨佔式的
讀Lock 共享式的
示例
/**
* describe:
* E-mail:[email protected] date:2018/12/16
*
* @Since 0.0.1
*/
public class ReentrantReadWriteLockTest {
static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
static Lock readLock = reentrantReadWriteLock.readLock();
static Lock writeLock = reentrantReadWriteLock.writeLock();
static class R extends Thread {
@Override
public void run() {
readLock.lock();
System.out.println( "讀"+Thread.currentThread().getName());
readLock.unlock();
}
}
static class W extends Thread {
@Override
public void run() {
writeLock.lock();
System.out.println("寫"+Thread.currentThread().getName());
writeLock.unlock();
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
if (i%2 == 0){
Thread r = new R();
r.start();
}else {
Thread w = new W();
w.start();
}
}
}
}
降級鎖(同一執行緒,資料的更改是可見的)
含義就是在寫lock的時候,可以允許去獲取讀鎖,此時沒有做降級的時候,讀鎖是“阻塞”的,但是做了降級後,讀鎖可以被獲取到,
讀鎖釋放,那麼該執行緒就從寫鎖執行緒變成了讀鎖執行緒了。
public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級從寫鎖獲取到開始
writeLock.lock();
try {
if (!update) {
// 準備資料的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級完成,寫鎖降級為讀鎖
}
try {
// 使用資料的流程(略)
} finally {
readLock.unlock();
}
}
LockSupport類
- 呼叫Unsafe的park(停車)方法
LockSupport定義了一組以park開頭的方法用來阻塞當前執行緒,以及unpark(Thread thread)
方法來喚醒一個被阻塞的執行緒
相當於wait和notify
Condition
Condition 操作的前提是,當前執行緒是同步佇列中的head,獲取到了鎖,才能進行等待/喚醒操作
- Condition 維護了一個Node等待佇列
- await操作
會將該執行緒從Syn佇列中移動到Condition等待佇列的隊尾
- single操作
會將該執行緒從Condition中移動到Syn的同步佇列的隊尾
Condition示例
此例中實現了偽消費者,是排他鎖的實現
當陣列長度滿的時候,remove將被喚醒,清空陣列
當陣列容量小於長度時,remove將阻塞,add方法被喚醒
注意
- 這裡要注意喚醒後的執行順序,是邏輯順序往下執行,所以必須要double check,此處喚醒是喚醒了所有等待的執行緒,有可能出現生產者喚醒生產者的情況
- 疑問
add()方法已經lock到鎖了,為什麼await後,remove可以獲取鎖,在add沒有unlock之前是無法獲取鎖的???
原因:
await()–>fullyRelease()–>release()–>tryRelease
release方法如下
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在single的時候,或有enqueue操作,我們的lock方法在無法獲取鎖的情況下就會enqueue操作,進入同步佇列
so it’s ok…
/**
* describe:
* E-mail:[email protected] date:2018/12/16
*
* @Since 0.0.1
*/
public class ConditionWaitAndNotify {
//排它鎖
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private Object[] ts;
private int index;
public ConditionWaitAndNotify(int count) {
if (count <= 0) throw new IllegalArgumentException("count must not be null");
ts = new Object[count];
}
public void add() {
try {
lock.lock();
//one check
if (index == ts.length) {
condition.await();
}
//double check
if (index == ts.length) {
return;
}
ts[index] = new Object();
index++;
System.out.println("add" + index);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void remove() {
try {
lock.lock();
//one check
if (index < ts.length) {
condition.await();
}
//double check
if (index < ts.length){
return;
}
index = 0;
System.out.println("remove" + index);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
static Thread createAdd(ConditionWaitAndNotify conditionWaitAndNotify){
return new Thread(() -> {
while (true){
conditionWaitAndNotify.add();
try {
//Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
static Thread createRemove(ConditionWaitAndNotify conditionWaitAndNotify){
return new Thread(() -> {
while (true) {
conditionWaitAndNotify.remove();
try {
//Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
ConditionWaitAndNotify conditionWaitAndNotify = new ConditionWaitAndNotify(3);
Thread t1 = createAdd(conditionWaitAndNotify);
Thread t11 = createAdd(conditionWaitAndNotify);
Thread t2 = createRemove(conditionWaitAndNotify);
Thread t22 = createRemove(conditionWaitAndNotify);
t1.start();
t11.start();
t2.start();
t22.start();
}
}