1. 程式人生 > 程式設計 >AQS 佇列同步器

AQS 佇列同步器

1. AQS是什麼

AQSAbstractQueuedSynchronizer的簡稱,是用來構建鎖或者其他同步元件的基礎框架,它使用int成員變量表示同步狀態,通過內建的FIFO(First in first out)佇列來完成資源獲取執行緒的排隊工作。

ReetrantLock、ReetrantReadWriteLock、Semaphore內部均有Sync抽象靜態內部類同步實現,Sync子類實現了公平與不公平版本。

2. AQS與鎖的關係?

AQS是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關係:

  1. 鎖是面向使用者的,它定義了使用者與鎖互動的介面,隱藏了實現細節
  2. 同步器是面向鎖的實現者,它簡化了所得實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒等底層操作
  3. 鎖和同步器很好地隔離了使用者和是閒著所需關注的領域

3. 怎麼使用?

定義同步元件,內部通過靜態內部類Sync實現AQS同步器,程式碼如下:

package com.alwyn.nettysample.synchronizer;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively
() { return getState() == 1; } @Override public boolean tryAcquire(int arg) { if (compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int releases) { if
(getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } Condition newCondition() {return new ConditionObject();} } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time,TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } } 複製程式碼

通過springboot快速搭建工程,其中controller程式碼如下:

package com.alwyn.nettysample.controller;

import com.alwyn.nettysample.synchronizer.Mutex;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@RestController
public class HelloController {
    private int count = 0;
    private AtomicInteger integer = new AtomicInteger(0);
    Mutex lock = new Mutex();

    @RequestMapping("/hi")
    public String hi() {
        try {
            int i = integer.incrementAndGet();
            System.out.println("request count:" + i);
            boolean flag = lock.tryLock(100,TimeUnit.MILLISECONDS);
            if (flag) {
                count++;
                System.out.println("count = " + count);
                Thread.sleep(10);
                lock.unlock();
            } else {
                boolean b = lock.hasQueuedThreads();
                Collection<Thread> queuedThreads = lock.getQueuedThreads();
                for (Thread thread : queuedThreads) {
                    System.out.println(thread.getId() + "::" + thread.getName() + "::" + thread.getState() + "::" + thread.getThreadGroup());
                }
                System.out.println("獲取鎖失敗:" + b);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        return "say hi world";
    }

    @RequestMapping("/reset")
    public void setValue() {
        integer.set(0);
        count = 0;
    }
}
複製程式碼

通過Jmeter設定併發請求鎖

結果: