AQS 佇列同步器
阿新 • • 發佈:2019-12-31
1. AQS是什麼
AQS
是AbstractQueuedSynchronizer
的簡稱,是用來構建鎖或者其他同步元件的基礎框架,它使用int成員變量表示同步狀態,通過內建的FIFO
(First in first out)佇列來完成資源獲取執行緒的排隊工作。
2. AQS與鎖的關係?
AQS是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關係:
- 鎖是面向使用者的,它定義了使用者與鎖互動的介面,隱藏了實現細節
- 同步器是面向鎖的實現者,它簡化了所得實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒等底層操作
- 鎖和同步器很好地隔離了使用者和是閒著所需關注的領域
3. 怎麼使用?
定義同步元件,內部通過靜態內部類Sync實現AQS同步器,程式碼如下:
package com.alwyn.nettysample.synchronizer;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
@Override
public boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {return new ConditionObject();}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time,TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
}
複製程式碼
通過springboot快速搭建工程,其中controller程式碼如下:
package com.alwyn.nettysample.controller;
import com.alwyn.nettysample.synchronizer.Mutex;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RestController
public class HelloController {
private int count = 0;
private AtomicInteger integer = new AtomicInteger(0);
Mutex lock = new Mutex();
@RequestMapping("/hi")
public String hi() {
try {
int i = integer.incrementAndGet();
System.out.println("request count:" + i);
boolean flag = lock.tryLock(100,TimeUnit.MILLISECONDS);
if (flag) {
count++;
System.out.println("count = " + count);
Thread.sleep(10);
lock.unlock();
} else {
boolean b = lock.hasQueuedThreads();
Collection<Thread> queuedThreads = lock.getQueuedThreads();
for (Thread thread : queuedThreads) {
System.out.println(thread.getId() + "::" + thread.getName() + "::" + thread.getState() + "::" + thread.getThreadGroup());
}
System.out.println("獲取鎖失敗:" + b);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "say hi world";
}
@RequestMapping("/reset")
public void setValue() {
integer.set(0);
count = 0;
}
}
複製程式碼
通過Jmeter設定併發請求鎖
結果: