java實現流量控制
java實現流量控制
有些時候我們的服務負載有限,這時候就需要限制對其的併發訪問,常見的應用場景是開放api。下面介紹兩種流量控制的方式。
1.訊號量semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。
獲得一項前,每個執行緒必須從訊號量獲取許可,從而保證可以使用該項。該執行緒結束後,將項返回到池中並將許可返回到該訊號量,從而允許其他執行緒獲取該項。注意,呼叫 acquire() 時無法保持同步鎖,因為這會阻止將項返回到池中。訊號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。
將訊號量初始化為 1,使得它在使用時最多隻有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進位制訊號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進位制訊號量具有某種屬性(與很多 Lock 實現不同),即可以由執行緒釋放“鎖”,而不是由所有者(因為訊號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。
此類的構造方法可選地接受一個公平 引數。當設定為 false 時,此類不對執行緒獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的執行緒前為呼叫 acquire() 的執行緒分配一個許可,從邏輯上說,就是新執行緒將自己置於等待執行緒佇列的頭部。當公平設定為 true 時,訊號量保證對於任何呼叫獲取方法的執行緒而言,都按照處理它們呼叫這些方法的順序(即先進先出;FIFO)來選擇執行緒、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個執行緒先於另一個執行緒呼叫了 acquire,但是卻在該執行緒之後到達排序點,並且從方法返回時也類似。還要注意,非同步的 tryAcquire 方法不使用公平設定,而是使用任意可用的許可。
通常,應該將用於控制資源訪問的訊號量初始化為公平的,以確保所有執行緒都可訪問資源。為其他的種類的同步控制使用訊號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。
此類還提供便捷的方法來同時 acquire 和釋放多個許可。小心,在未將公平設定為 true 時使用這些方法會增加不確定延期的風險。
記憶體一致性效果:執行緒中呼叫“釋放”方法(比如 release())之前的操作 happen-before 另一執行緒中緊跟在成功的“獲取”方法(比如 acquire())之後的操作。
上面說了這麼多來看看如何應用:
用訊號量實現對一個方法的併發訪問,每次只能10個執行緒同時訪問,超過執行緒數直接返回
private static Semaphore semaphore = new Semaphore(10);
void toDo() {
if (!semaphore.tryAcquire()) {
return;
}
try {
//do something
} finally {
semaphore.release();
}
}
2. 併發計數器
也可以使用併發計數器來實現,設定一個計數器,當小於限制放行,進入後當前計數+1,結束後當前計數-1
private static AtomicInteger atomicInteger = new AtomicInteger(1);
void toAdd() {
int count = atomicInteger.get();
if (count > 10) {
return;
}
if (!atomicInteger.compareAndSet(count, count + 1)) {
return;
}
//do something
atomicInteger.decrementAndGet();
}
注意上面的if (count > 10) 不能使用atomicInteger.getAndDecrement()>10,atomicInteger.getAndDecrement()雖然是原子操作,但是atomicInteger.getAndDecrement()>10卻不是原子操作,在進行比較那一刻,可能有其它執行緒已經更改了atomicInteger值。
3. 測試
下面對上面的兩種方法進行測試,看看能否達到效果,測試程式碼如下:
public class FlowControl {
private static Semaphore semaphore = new Semaphore(10);
void toDo() {
if (!semaphore.tryAcquire()) {
return;
}
try {
//---------------------------do something-----------------------
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//---------------------------------------------------------------------
} finally {
semaphore.release();
}
}
private static AtomicInteger atomicInteger = new AtomicInteger(1);
void toAdd() {
int count = atomicInteger.get();
if (count > 10) {
return;
}
if (!atomicInteger.compareAndSet(count, count + 1)) {
return;
}
//---------------------------do something-----------------------
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//---------------------------------------------------------------------
atomicInteger.decrementAndGet();
}
public static void main(String[] args) {
final FlowControl f = new FlowControl();
for (int i = 0; i < 10; i++) {
Thread[] threads = new Thread[20];
for (int j = 0; j < 20; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
f.toDo();
}
});
threads[j] = t;
t.start();
}
for (int j = 0; j < 20; j++) {
try {
threads[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("-----------------");
}
}
}
測試程式碼啟動20個執行緒每次對方法呼叫,共迴圈10次。輸出結果如下:
可以看到每次只能有10個執行緒能夠同時訪問,達到了流量控制目的
Thread-0
Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6
Thread-7
Thread-8
Thread-9
-----------------
Thread-20
Thread-21
Thread-22
Thread-23
Thread-24
Thread-25
Thread-26
Thread-27
Thread-28
Thread-30
-----------------
Thread-40
Thread-42
Thread-41
Thread-43
Thread-44
Thread-45
Thread-46
Thread-47
Thread-48
Thread-49
-----------------