Java併發程式設計入門(十一)限流場景和Spring限流器實現
一、限流場景
限流場景一般基於硬體資源的使用負載,包括CPU,記憶體,IO。例如某個報表服務需要消耗大量記憶體,如果併發數增加就會拖慢整個應用,甚至記憶體溢位導致應用掛掉。
限流適用於會動態增加的資源,已經池化的資源不一定需要限流,例如資料庫連線池,它是已經確定的資源,池的大小固定(即使可以動態伸縮池大小),這種場景下並不需要通過限流來實現,只要能做到如果池內連結已經使用完,則無法再獲取新的連線則可。
因此,使用限流的前提是:
1.防止資源使用過載產生不良影響。
2.使用的資源會動態增加,例如一個站點的請求。
二、Spring中實現限流
I、限流需求
1.只針對Controller限流
2.根據url請求路徑限流
3.可根據正則表示式匹配url來限流
4.可定義多個限流規則,每個規則的最大流量不同
II、相關類結構
1.CurrentLimiteAspect是一個攔截器,在controller執行前後執行後攔截
2.CurrentLimiter是限流器,可以新增限流規則,根據限流規則獲取流量通行證,釋放流量通行證;如果獲取通行證失敗則丟擲異常。
3.LimiteRule是限流規則,限流規則可設定匹配url的正則表示式和最大流量值,同時獲取該規則的流量通訊證和釋放流量通訊證。
4.AcquireResult是獲取流量通訊證的結果,結果有3種:獲取成功,獲取失敗,不需要獲取。
5.Application是Spring的啟動類,簡單起見,在啟動類種新增限流規則。
III、Show me code
1.AcquireResult.java
public class AcquireResult {
/** 獲取通行證成功 */
public static final int ACQUIRE_SUCCESS = 0;
/** 獲取通行證失敗 */
public static final int ACQUIRE_FAILED = 1;
/** 不需要獲取通行證 */
public static final int ACQUIRE_NONEED = 2;
/** 獲取通行證結果 */
private int result;
/** 可用通行證數量 */
private int availablePermits;
public int getResult() {
return result;
}
public void setResult(int result) {
this.result = result;
}
public int getAvailablePermits() {
return availablePermits;
}
public void setAvailablePermits(int availablePermits) {
this.availablePermits = availablePermits;
}
}
複製程式碼
2.LimiteRule.java
/**
* @ClassName LimiteRule
* @Description TODO
* @Author 鏗然一葉
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class LimiteRule {
/** 訊號量 */
private final Semaphore sema;
/** 請求URL匹配規則 */
private final String pattern;
/** 最大併發數 */
private final int maxConcurrent;
public LimiteRule(String pattern,int maxConcurrent) {
this.sema = new Semaphore(maxConcurrent);
this.pattern = pattern;
this.maxConcurrent = maxConcurrent;
}
/**
* 獲取通行證。這裡加同步是為了列印可用通行證數量時看起來逐個減少或者逐個增加,無此列印需求可不加synchronized關鍵字
* @param urlPath 請求Url
* @return 0-獲取成功,1-沒有獲取到通行證,2-不需要獲取通行證
*/
public synchronized AcquireResult tryAcquire(String urlPath) {
AcquireResult acquireResult = new AcquireResult();
acquireResult.setAvailablePermits(this.sema.availablePermits());
try {
//Url請求匹配規則則獲取通行證
if (Pattern.matches(pattern,urlPath)) {
boolean acquire = this.sema.tryAcquire(50,TimeUnit.MILLISECONDS);
if (acquire) {
acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS);
print(urlPath);
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_FAILED);
}
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_NONEED);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return acquireResult;
}
/**
* 釋放通行證,這裡加同步是為了列印可用通行證數量時看起來逐個減少或者逐個增加,無此列印需求可不加synchronized關鍵字
*/
public synchronized void release() {
this.sema.release();
print(null);
}
/**
* 得到最大併發數
* @return
*/
public int getMaxConcurrent() {
return this.maxConcurrent;
}
/**
* 得到匹配表示式
* @return
*/
public String getPattern() {
return this.pattern;
}
/**
* 列印日誌
* @param urlPath
*/
private void print(String urlPath) {
StringBuffer buffer = new StringBuffer();
buffer.append("Pattern: ").append(pattern).append(",");
if (null != urlPath) {
buffer.append("urlPath: ").append(urlPath).append(",");
}
buffer.append("Available Permits:").append(this.sema.availablePermits());
System.out.println(buffer.toString());
}
}
複製程式碼
3.CurrentLimiter.java
/**
* @ClassName CurrentLimiter
* @Description TODO
* @Author 鏗然一葉
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiter {
/** 本地執行緒變數,儲存一次請求獲取到的通行證,和其他併發請求隔離開,在controller執行完後釋放本次請求獲得的通行證 */
private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>();
/** 所有限流規則 */
private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>();
/** 私有構造器,避免例項化 */
private CurrentLimiter() {}
/**
* 新增限流規則,在spring啟動時新增,不需要加鎖,如果在執行中動態新增,需要加鎖
* @param rule
*/
public static void addRule(LimiteRule rule) {
printRule(rule);
allLimiteRules.add(rule);
}
/**
* 獲取流量通訊證,所有流量規則都要獲取後才能通過,如果一個不能獲取則丟擲異常
* 多執行緒併發,需要加鎖
* @param urlPath
*/
public static void tryAcquire(String urlPath) throws Exception {
//有限流規則則處理
if (allLimiteRules.size() > 0) {
//能獲取到通行證的流量規則要儲存下來,在Controller執行完後要釋放
Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>();
for(LimiteRule rule:allLimiteRules) {
//獲取通行證
AcquireResult acquireResult = rule.tryAcquire(urlPath);
if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) {
acquiredLimitRules.add(rule);
//獲取到通行證的流量規則新增到本地執行緒變數
localAcquiredLimiteRules.set(acquiredLimitRules);
} else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) {
//如果獲取不到通行證則丟擲異常
StringBuffer buffer = new StringBuffer();
buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit,the limit is ").append(rule.getMaxConcurrent())
.append(",available permit is").append(acquireResult.getAvailablePermits()).append(".");
System.out.println(buffer);
throw new Exception(buffer.toString());
} else {
StringBuffer buffer = new StringBuffer();
buffer.append("This path does not match the limit rule,path is [").append(urlPath)
.append("],pattern is [").append(rule.getPattern()).append("].");
System.out.println(buffer.toString());
}
}
}
}
/**
* 釋放獲取到的通行證。在controller執行完後掉呼叫(丟擲異常也需要呼叫)
*/
public static void release() {
Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get();
if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) {
acquiredLimitRules.forEach(rule->{
rule.release();
});
}
//destory本地執行緒變數,避免記憶體洩漏
localAcquiredLimiteRules.remove();
}
/**
* 列印限流規則資訊
* @param rule
*/
private static void printRule(LimiteRule rule) {
StringBuffer buffer = new StringBuffer();
buffer.append("Add Limit Rule,Max Concurrent: ").append(rule.getMaxConcurrent())
.append(",Pattern: ").append(rule.getPattern());
System.out.println(buffer.toString());
}
}
複製程式碼
4.CurrentLimiteAspect.java
/**
* @ClassName CurrentLimiteAspect
* @Description TODO
* @Author 鏗然一葉
* @Date 2019/10/4 20:15
* @Version 1.0
* javashizhan.com
**/
@Aspect
@Component
public class CurrentLimiteAspect {
/**
* 攔截controller,自行修改路徑
*/
@Pointcut("execution(* com.javashizhan.controller..*(..))")
public void controller() { }
@Before("controller()")
public void controller(JoinPoint point) throws Exception {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
//獲取通行證,urlPath的格式如:/limit
CurrentLimiter.tryAcquire(request.getRequestURI());
}
/**
* controller執行完後呼叫,即使controller丟擲異常這個攔截方法也會被呼叫
* @param joinPoint
*/
@After("controller()")
public void after(JoinPoint joinPoint) {
//釋放獲取到的通行證
CurrentLimiter.release();
}
}
複製程式碼
5.Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).run(args);
//新增限流規則
LimiteRule rule = new LimiteRule("/limit",4);
CurrentLimiter.addRule(rule);
}
}
複製程式碼
IV、驗證
測試驗證碰到的兩個坑:
1.人工通過瀏覽器重新整理請求發現controller是序列的
2.通過postman設定了併發測試也還是序列的,即便設定了併發數,如下圖:
百度無果,只能自行寫程式碼驗證了,程式碼如下:
/**
* @ClassName CurrentLimiteTest
* @Description 驗證限流器
* @Author 鏗然一葉
* @Date 2019/10/5 0:51
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiteTest {
public static void main(String[] args) {
final String limitUrlPath = "http://localhost:8080/limit";
final String noLimitUrlPath = "http://localhost:8080/nolimit";
//限流測試
test(limitUrlPath);
//休眠一會,等上一批執行緒執行完,方便檢視日誌
sleep(5000);
//不限流測試
test(noLimitUrlPath);
}
private static void test(String urlPath) {
Thread[] requesters = new Thread[10];
for (int i = 0; i < requesters.length; i++) {
requesters[i] = new Thread(new Requester(urlPath));
requesters[i].start();
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Requester implements Runnable {
private final String urlPath;
private final RestTemplate restTemplate = new RestTemplate();
public Requester(String urlPath) {
this.urlPath = urlPath;
}
@Override
public void run() {
String response = restTemplate.getForEntity(urlPath,String.class).getBody();
System.out.println("response: " + response);
}
}
複製程式碼
輸出日誌如下:
Pattern: /limit,urlPath: /limit,Available Permits:3
Pattern: /limit,Available Permits:2
Pattern: /limit,Available Permits:1
Pattern: /limit,Available Permits:0
The request [/limit] exceeds maximum traffic limit,the limit is 4,available permit is0.
The request [/limit] exceeds maximum traffic limit,available permit is0.
Pattern: /limit,Available Permits:4
This path does not match the limit rule,path is [/nolimit] pattern is [/limit].
This path does not match the limit rule,path is [/nolimit] pattern is [/limit].
複製程式碼
可以看到日誌輸出資訊為:
1.第1個測試url最大併發為4,一次10個併發請求,有4個獲取通行證後,剩餘6個獲取通行證失敗。
2.獲取到通行證的4個請求在controller執行完後釋放了通行證。
3.第2個測試url沒有限制併發,10個請求均執行成功。
至此,限流器驗證成功。
注意:去掉同步鎖後(synchronized關鍵字),列印的日誌類似如下,可以看到可用通行證數量不是遞增或者遞減的,但這並不表明邏輯不正確,這是因為訊號量支援多個執行緒進入臨界區,在列印之前,可能已經減少了多個通行證,另外先執行的執行緒不一定先結束,所以看到的可用通訊證數量不是遞增也不是遞減的。訊號量只能保證的是用掉一個通行證,就少一個。
Pattern: /limit,Available Permits:0
Pattern: /limit,Available Permits:2
The request [/limit] exceeds maximum traffic limit,available permit is0.
This path does not match the limit rule,path is [/nolimit],pattern is [/limit].
This path does not match the limit rule,pattern is [/limit].
Pattern: /limit,Available Permits:4
Pattern: /limit,Available Permits:3
This path does not match the limit rule,pattern is [/limit].
複製程式碼
end.
相關閱讀:
Java併發程式設計(一)知識地圖
Java併發程式設計(二)原子性
Java併發程式設計(三)可見性
Java併發程式設計(四)有序性
Java併發程式設計(五)建立執行緒方式概覽
Java併發程式設計入門(六)synchronized用法
Java併發程式設計入門(七)輕鬆理解wait和notify以及使用場景
Java併發程式設計入門(八)執行緒生命週期
Java併發程式設計入門(九)死鎖和死鎖定位
Java併發程式設計入門(十)鎖優化
Java併發程式設計入門(十二)生產者和消費者模式-程式碼模板
Java併發程式設計入門(十三)讀寫鎖和快取模板
Java併發程式設計入門(十四)CountDownLatch應用場景
Java併發程式設計入門(十五)CyclicBarrier應用場景
Java併發程式設計入門(十六)秒懂執行緒池差別
Java併發程式設計入門(十七)一圖掌握執行緒常用類和介面
Java併發程式設計入門(十八)再論執行緒安全
Java極客站點: javageektour.com/