高併發之API介面限流
在開發高併發系統時有三把利器用來保護系統:快取、降級和限流
快取
快取的目的是提升系統訪問速度和增大系統處理容量降級
降級是當服務出現問題或者影響到核心流程時,需要暫時遮蔽掉,待高峰或者問題解決後再開啟限流
限流的目的是通過對併發訪問/請求進行限速,或者對一個時間視窗內的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理
問題描述
某天A君突然發現自己的介面請求量突然漲到之前的10倍,沒多久該介面幾乎不可使用,並引發連鎖反應導致整個系統崩潰。如何應對這種情況呢?生活給了我們答案:比如老式電閘都安裝了保險絲,一旦有人使用超大功率的裝置,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的介面也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以採取拒絕或者引流等機制。
快取的目的是提升系統訪問速度和增大系統能處理的容量,可謂是抗高併發流量的銀彈;而降級是當服務出問題或者影響到核心流程的效能則需要暫時遮蔽掉,待高峰或者問題解決後再開啟;而有些場景並不能用快取和降級來解決,比如稀缺資源(秒殺、搶購)、寫服務(如評論、下單)、頻繁的複雜查詢(評論的最後幾頁),因此需有一種手段來限制這些場景的併發/請求量,即限流。
系統在設計之初就會有一個預估容量,長時間超過系統能承受的TPS/QPS閾值,系統可能會被壓垮,最終導致整個服務不夠用。為了避免這種情況,我們就需要對介面請求進行限流。
限流的目的是通過對併發訪問請求進行限速或者一個時間視窗內的的請求數量進行限速來保護系統,一旦達到限制速率則可以拒絕服務、排隊或等待。
一般開發高併發系統常見的限流模式有控制併發和控制速率,一個是限制併發的總數量(比如資料庫連線池、執行緒池),一個是限制併發訪問的速率(如nginx的limit_conn模組,用來限制瞬時併發連線數),另外還可以限制單位時間視窗內的請求數量(如Guava的RateLimiter、nginx的limit_req模組,限制每秒的平均速率)。其他還有如限制遠端介面呼叫速率、限制MQ的消費速率。另外還可以根據網路連線數、網路流量、CPU或記憶體負載等來限流。
相關概念:
PV:
page view 頁面總訪問量,每重新整理一次記錄一次。
UV:
unique view 客戶端主機訪問,指一天內相同IP的訪問記為1次。
QPS:
query per second,即每秒訪問量。qps很大程度上代表了系統的繁忙度,沒次請求可能存在多次的磁碟io,網路請求,多個cpu時間片,一旦qps超過了預先設定的閥值,可以考量擴容增加伺服器,避免訪問量過大導致的宕機。
RT:
response time,每次請求的響應時間,直接決定使用者體驗性。
本文主要介紹應用級限流方法,分散式限流、流量入口限流(接入層如NGINX limit_conn和limit_req 模組)。
應用級限流
一、控制併發數量
屬於一種較常見的限流手段,在實際應用中可以通過訊號量機制(如Java中的Semaphore)來實現。 作業系統的訊號量是個很重要的概念,Java 併發庫 的Semaphore 可以很輕鬆完成訊號量控制,Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
舉個例子,我們對外提供一個服務介面,允許最大併發數為10,程式碼實現如下:
public class DubboService {
private final Semaphore permit = new Semaphore(10, true);
public void process(){
try{
permit.acquire();
//業務邏輯處理
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
permit.release();
}
}
}
在以上程式碼中,雖然有30個執行緒在執行,但是隻允許10個併發的執行。Semaphore的構造方法Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示允許10個執行緒獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先執行緒使用Semaphore的acquire()獲取一個許可證,使用完之後呼叫release()歸還許可證,還可以用tryAcquire()方法嘗試獲取許可證,訊號量的本質是控制某個資源可被同時訪問的個數,在一定程度上可以控制某資源的訪問頻率,但不能精確控制,控制訪問頻率的模式見下文描述。
二、控制訪問速率
在工程實踐中,常見的是使用令牌桶演算法來實現這種模式,常用的限流演算法有兩種:漏桶演算法和令牌桶演算法。
-
漏桶演算法
漏桶演算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水,當水流入速度過大會直接溢位,可以看出漏桶演算法能強行限制資料的傳輸速率。
對於很多應用場景來說,除了要求能夠限制資料的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶演算法可能就不合適了,令牌桶演算法更為適合。
-
令牌桶演算法
如圖所示,令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務,令牌桶演算法通過發放令牌,根據令牌的rate頻率做請求頻率限制,容量限制等。
在Wikipedia上,令牌桶演算法是這麼描述的:
- 每過1/r秒桶中增加一個令牌。
- 桶中最多存放b個令牌,如果桶滿了,新放入的令牌會被丟棄。
- 當一個n位元組的資料包到達時,消耗n個令牌,然後傳送該資料包。
- 如果桶中可用令牌小於n,則該資料包將被快取或丟棄。
令牌桶控制的是一個時間視窗內通過的資料量,在API層面我們常說的QPS、TPS,正好是一個時間視窗內的請求量或者事務量,只不過時間視窗限定在1s罷了。以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。令牌桶的另外一個好處是可以方便的改變速度,一旦需要提高速率,則按需提高放入桶中的令牌的速率。
在我們的工程實踐中,通常使用Google開源工具包Guava提供的限流工具類RateLimiter來實現控制速率,該類基於令牌桶演算法來完成限流,非常易於使用,而且非常高效。如我們不希望每秒的任務提交超過1個
public static void main(String[] args) {
String start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
RateLimiter limiter = RateLimiter.create(1.0); // 這裡的1表示每秒允許處理的量為1個
for (int i = 1; i <= 10; i++) {
double waitTime = limiter.acquire(i);// 請求RateLimiter, 超過permits會被阻塞
System.out.println("cutTime=" + System.currentTimeMillis() + " call execute:" + i + " waitTime:" + waitTime);
}
String end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("start time:" + start);
System.out.println("end time:" + end);
}
首先通過RateLimiter.create(1.0);
建立一個限流器,引數代表每秒生成的令牌數,通過limiter.acquire(i);
來以阻塞的方式獲取令牌,令牌桶演算法允許一定程度的突發(允許消費未來的令牌),所以可以一次性消費i個令牌;當然也可以通過tryAcquire(int permits, long timeout, TimeUnit unit)
來設定等待超時時間的方式獲取令牌,如果超timeout為0,則代表非阻塞,獲取不到立即返回,支援阻塞或可超時的令牌消費。
從輸出來看,RateLimiter支援預消費,比如在acquire(5)時,等待時間是4秒,是上一個獲取令牌時預消費了3個兩排,固需要等待3*1秒,然後又預消費了5個令牌,以此類推。
RateLimiter通過限制後面請求的等待時間,來支援一定程度的突發請求(預消費),在使用過程中需要注意這一點,Guava有兩種限流模式,一種為穩定模式(SmoothBursty:令牌生成速度恆定,平滑突發限流),一種為漸進模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩定值,平滑預熱限流) 兩種模式實現思路類似,主要區別在等待時間的計算上。
- SmoothBursty 模式:RateLimiter limiter = RateLimiter.create(5);
RateLimiter.create(5)表示桶容量為5且每秒新增5個令牌,即每隔200毫秒新增一個令牌;limiter.acquire()表示消費一個令牌,如果當前桶中有足夠令牌則成功(返回值為0),如果桶中沒有令牌則暫停一段時間,比如發令牌間隔是200毫秒,則等待200毫秒後再去消費令牌,這種實現將突發請求速率平均為了固定請求速率。
- SmoothWarmingUp模式:RateLimiter limiter = RateLimiter.create(5,1000, TimeUnit.MILLISECONDS);
- 建立方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit),permitsPerSecond表示每秒新增的令牌數,warmupPeriod表示在從冷啟動速率過渡到平均速率的時間間隔。速率是梯形上升速率的,也就是說冷啟動時會以一個比較大的速率慢慢到平均速率;然後趨於平均速率(梯形下降到平均速率)。可以通過調節warmupPeriod引數實現一開始就是平滑固定速率。
放在Controller中用Jemter壓測
注:RateLimiter控制的是速率,Samephore控制的是併發量。RateLimiter的原理就是令牌桶,它主要由許可發出的速率來定義,如果沒有額外的配置,許可證將按每秒許可證規定的固定速度分配,許可將被平滑地分發,若請求超過permitsPerSecond則RateLimiter按照每秒 1/permitsPerSecond 的速率釋放許可。注意:RateLimiter適用於單體應用,且RateLimiter不保證公平性訪問。
使用上述方式使用RateLimiter的方式不夠優雅,自定義註解+AOP的方式實現(適用於單體應用),詳細見下面程式碼:
自定義註解:
import java.lang.annotation.*;
/**
* 自定義註解可以不包含屬性,成為一個標識註解
*/
@Inherited
@Documented
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimitAspect {
}
自定義切面類
import com.google.common.util.concurrent.RateLimiter;
import com.test.cn.springbootdemo.util.ResultUtil;
import net.sf.json.JSONObject;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@Component
@Scope
@Aspect
public class RateLimitAop {
@Autowired
private HttpServletResponse response;
private RateLimiter rateLimiter = RateLimiter.create(5.0); //比如說,我這裡設定"併發數"為5
@Pointcut("@annotation(com.test.cn.springbootdemo.aspect.RateLimitAspect)")
public void serviceLimit() {
}
@Around("serviceLimit()")
public Object around(ProceedingJoinPoint joinPoint) {
Boolean flag = rateLimiter.tryAcquire();
Object obj = null;
try {
if (flag) {
obj = joinPoint.proceed();
}else{
String result = JSONObject.fromObject(ResultUtil.success1(100, "failure")).toString();
output(response, result);
}
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("flag=" + flag + ",obj=" + obj);
return obj;
}
public void output(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream outputStream = null;
try {
outputStream = response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} finally {
outputStream.flush();
outputStream.close();
}
}
}
測試controller
import com.test.cn.springbootdemo.aspect.RateLimitAspect;
import com.test.cn.springbootdemo.util.ResultUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@ResponseBody
@RateLimitAspect
@RequestMapping("/test")
public String test(){
return ResultUtil.success1(1001, "success").toString();
}
壓測結果:
三、控制單位時間視窗內請求數
某些場景下,我們想限制某個介面或服務 每秒/每分鐘/每天 的請求次數或呼叫次數。例如限制服務每秒的呼叫次數為50,實現如下:
private LoadingCache<Long, AtomicLong> counter =
CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
public static long permit = 50;
public ResponseEntity getData() throws ExecutionException {
//得到當前秒
long currentSeconds = System.currentTimeMillis() / 1000;
if(counter.get(currentSeconds).incrementAndGet() > permit) {
return ResponseEntity.builder().code(404).msg("訪問速率過快").build();
}
//業務處理
}
到此應用級限流的一些方法就介紹完了。假設將應用部署到多臺機器,應用級限流方式只是單應用內的請求限流,不能進行全侷限流。因此我們需要分散式限流和接入層限流來解決這個問題。
分散式限流
- 自定義註解+攔截器+Redis實現限流 (單體和分散式均適用,全侷限流)
自定義註解:
@Inherited
@Documented
@Target({ElementType.FIELD,ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {
int limit() default 5;
int sec() default 5;
}
攔截器:
public class AccessLimitInterceptor implements HandlerInterceptor {
@Autowired
private RedisTemplate<String, Integer> redisTemplate; //使用RedisTemplate操作redis
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
if (!method.isAnnotationPresent(AccessLimit.class)) {
return true;
}
AccessLimit accessLimit = method.getAnnotation(AccessLimit.class);
if (accessLimit == null) {
return true;
}
int limit = accessLimit.limit();
int sec = accessLimit.sec();
String key = IPUtil.getIpAddr(request) + request.getRequestURI();
Integer maxLimit = redisTemplate.opsForValue().get(key);
if (maxLimit == null) {
redisTemplate.opsForValue().set(key, 1, sec, TimeUnit.SECONDS); //set時一定要加過期時間
} else if (maxLimit < limit) {
redisTemplate.opsForValue().set(key, maxLimit + 1, sec, TimeUnit.SECONDS);
} else {
output(response, "請求太頻繁!");
return false;
}
}
return true;
}
public void output(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream outputStream = null;
try {
outputStream = response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} finally {
outputStream.flush();
outputStream.close();
}
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
}
}
controller:
@Controller
@RequestMapping("/activity")
public class AopController {
@ResponseBody
@RequestMapping("/seckill")
@AccessLimit(limit = 4,sec = 10) //加上自定義註解即可
public String test (HttpServletRequest request,@RequestParam(value = "username",required = false) String userName){
//TODO somethings……
return "hello world !";
}
}
配置檔案:
/*springmvc的配置檔案中加入自定義攔截器*/
<mvc:interceptors>
<mvc:interceptor>
<mvc:mapping path="/**"/>
<bean class="com.pptv.activityapi.controller.pointsmall.AccessLimitInterceptor"/>
</mvc:interceptor>
</mvc:interceptors>
訪問效果如下,10s內訪問介面超過4次以上就過濾請求,原理和計數器演算法類似:
接入層限流
主要介紹nginx 限流,採用漏桶演算法。
限制原理:可一句話概括為:“根據客戶端特徵,限制其訪問頻率”,客戶端特徵主要指IP、UserAgent等。使用IP比UserAgent更可靠,因為IP無法造假,UserAgent可隨意偽造。
1.併發數和連線數控制的配置:
nginx http配置:
#請求數量控制,每秒20個
limit_req_zone $binary_remote_addr zone=one:10m rate=20r/s;
#併發限制30個
limit_conn_zone $binary_remote_addr zone=addr:10m;
server塊配置
limit_req zone=one burst=5;
limit_conn addr 30;
2. ngx_http_limit_conn_module 可以用來限制單個IP的連線數:
ngx_http_limit_conn_module模組可以按照定義的鍵限定每個鍵值的連線數。可以設定單一 IP 來源的連線數。
並不是所有的連線都會被模組計數;只有那些正在被處理的請求(這些請求的頭資訊已被完全讀入)所在的連線才會被計數。
http {
limit_conn_zone $binary_remote_addr zone=addr:10m;
...
server {
...
location /download/ {
limit_conn addr 1;
}
以上文章部分出自網路,參考連結如下: