1. 程式人生 > >java實現流量控制

java實現流量控制

java實現流量控制

有些時候我們的服務負載有限,這時候就需要限制對其的併發訪問,常見的應用場景是開放api。下面介紹兩種流量控制的方式。

1.訊號量semaphore

一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。

獲得一項前,每個執行緒必須從訊號量獲取許可,從而保證可以使用該項。該執行緒結束後,將項返回到池中並將許可返回到該訊號量,從而允許其他執行緒獲取該項。注意,呼叫 acquire() 時無法保持同步鎖,因為這會阻止將項返回到池中。訊號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。

將訊號量初始化為 1,使得它在使用時最多隻有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進位制訊號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進位制訊號量具有某種屬性(與很多 Lock 實現不同),即可以由執行緒釋放“鎖”,而不是由所有者(因為訊號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。

此類的構造方法可選地接受一個公平 引數。當設定為 false 時,此類不對執行緒獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的執行緒前為呼叫 acquire() 的執行緒分配一個許可,從邏輯上說,就是新執行緒將自己置於等待執行緒佇列的頭部。當公平設定為 true 時,訊號量保證對於任何呼叫獲取方法的執行緒而言,都按照處理它們呼叫這些方法的順序(即先進先出;FIFO)來選擇執行緒、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個執行緒先於另一個執行緒呼叫了 acquire,但是卻在該執行緒之後到達排序點,並且從方法返回時也類似。還要注意,非同步的 tryAcquire 方法不使用公平設定,而是使用任意可用的許可。

通常,應該將用於控制資源訪問的訊號量初始化為公平的,以確保所有執行緒都可訪問資源。為其他的種類的同步控制使用訊號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。

此類還提供便捷的方法來同時 acquire 和釋放多個許可。小心,在未將公平設定為 true 時使用這些方法會增加不確定延期的風險。

記憶體一致性效果:執行緒中呼叫“釋放”方法(比如 release())之前的操作 happen-before 另一執行緒中緊跟在成功的“獲取”方法(比如 acquire())之後的操作。

上面說了這麼多來看看如何應用:
用訊號量實現對一個方法的併發訪問,每次只能10個執行緒同時訪問,超過執行緒數直接返回

,程式碼如下:

private static Semaphore semaphore = new Semaphore(10);

    void toDo() {
        if (!semaphore.tryAcquire()) {
            return;
        }
        try {
            //do something
        } finally {
            semaphore.release();
        }
    }

2. 併發計數器

也可以使用併發計數器來實現,設定一個計數器,當小於限制放行,進入後當前計數+1,結束後當前計數-1

private static AtomicInteger atomicInteger = new AtomicInteger(1);

    void toAdd() {
        int count = atomicInteger.get();
        if (count > 10) {
            return;
        }
        if (!atomicInteger.compareAndSet(count, count + 1)) {
            return;
        }
        //do something
        atomicInteger.decrementAndGet();
    }

注意上面的if (count > 10) 不能使用atomicInteger.getAndDecrement()>10,atomicInteger.getAndDecrement()雖然是原子操作,但是atomicInteger.getAndDecrement()>10卻不是原子操作,在進行比較那一刻,可能有其它執行緒已經更改了atomicInteger值。

3. 測試

下面對上面的兩種方法進行測試,看看能否達到效果,測試程式碼如下:

public class FlowControl {

    private static Semaphore semaphore = new Semaphore(10);

    void toDo() {
        if (!semaphore.tryAcquire()) {
            return;
        }
        try {
        	//---------------------------do something-----------------------
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //---------------------------------------------------------------------
        } finally {
            semaphore.release();
        }
    }

    private static AtomicInteger atomicInteger = new AtomicInteger(1);

    void toAdd() {
        int count = atomicInteger.get();
        if (count > 10) {
            return;
        }
        if (!atomicInteger.compareAndSet(count, count + 1)) {
            return;
        }
        //---------------------------do something-----------------------
		System.out.println(Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //---------------------------------------------------------------------
        atomicInteger.decrementAndGet();
    }

    public static void main(String[] args) {
        final FlowControl f = new FlowControl();
        for (int i = 0; i < 10; i++) {
            Thread[] threads = new Thread[20];
            for (int j = 0; j < 20; j++) {
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        f.toDo();
                    }
                });
                threads[j] = t;
                t.start();
            }
            for (int j = 0; j < 20; j++) {
                try {
                    threads[j].join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("-----------------");
        }
    }
}

測試程式碼啟動20個執行緒每次對方法呼叫,共迴圈10次。輸出結果如下:
可以看到每次只能有10個執行緒能夠同時訪問,達到了流量控制目的

Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
-----------------
Thread-20
Thread-21
Thread-22
Thread-23
Thread-24
Thread-25
Thread-26
Thread-27
Thread-28
Thread-30
-----------------
Thread-40
Thread-42
Thread-41
Thread-43
Thread-44
Thread-45
Thread-46
Thread-47
Thread-48
Thread-49
-----------------