1. 程式人生 > >常見限流演算法研究與實現

常見限流演算法研究與實現

一、限流場景

       很多做服務介面的人或多或少的遇到這樣的場景,由於業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。也就是面對大流量時,如何進行流量控制?服務介面的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然降低了服務介面的訪問頻率和併發量,卻換取服務介面和業務應用系統的高可用。

1、實際場景中常用的限流策略:

  • Nginx前端限流

2、按照一定的規則如帳號、IP、系統呼叫邏輯等在Nginx層面做限流

3、業務應用系統限流

  • 客戶端限流

  • 服務端限流

4、資料庫限流

  • 紅線區,力保資料庫

      本文主要講的是服務端限流的演算法。常見的限流演算法有:計數器、令牌桶和漏桶演算法,下面一起來看看。

二、限流演算法

1、計數器

        計數器法是限流演算法裡最簡單也是最容易實現的一種演算法。比如我們規定,對於A介面來說,我們1分鐘的訪問次數不能超過10個。那麼我們可以這麼做:在一開始的時候,我們可以設定一個計數器counter,每當一個請求過來的時候,counter就加1,如果counter的值大於10並且該請求與第一個 請求的間隔時間還在1分鐘之內,那麼說明請求數過多;如果該請求與第一個請求的間隔時間大於1分鐘,且counter的值還在限流範圍內,那麼就重置 counter,具體演算法的示意圖如下:

(1)寫法一,就用一個計算器來實現

public class CountRateLimiter {

/**計算器*/
private              AtomicLong counter   = new AtomicLong(0);
/**初始時間*/
private static       long       timestamp = System.currentTimeMillis();
/**時間視窗內最大請求個數*/
private long limit;

public CountRateLimiter(long limit) {
    this.limit = limit;
}

public boolean tryAcquire() {
    long now = System.currentTimeMillis();
    //1s之內的請求
    if (now - timestamp < 1000) {
        if (counter.get() < limit) {
            counter.incrementAndGet();
            System.out.println("pass_request");
            return true;
        } else {
            System.out.println("refuse_request");
            return false;
        }
    } else {
        counter = new AtomicLong(0);
        timestamp = now;
        System.out.println("time_end,refuse_request");
        return false;
    }
}

public static void main(String[] args) {
    CountRateLimiter rateLimiter = new CountRateLimiter(10);
    ExecutorService executor = Executors.newCachedThreadPool();
    for(int i=0;i<1000;i++) {
        executor.submit(() -> {
            double random = (new Random()).nextDouble();
            long a = (long)(random * 1000);
            try {
                //睡眠一下
                Thread.sleep(a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            rateLimiter.tryAcquire();
        });
    }
    executor.shutdown();
}
}

輸出結果:

(2)寫法二,用快取來實現

       使用Guava的Cache來儲存計數器,過期時間設定為2秒(保證1秒內的計數器是有的),然後我們獲取當前時間戳然後取秒數來作為KEY進行計數統計和限流。

public class GuavaCountRateLimiter {

/**快取*/
private LoadingCache<Long, AtomicLong> counter =
        CacheBuilder.newBuilder()
                .expireAfterWrite(2, TimeUnit.SECONDS)
                .build(new CacheLoader<Long, AtomicLong>() {
                    @Override
                    public AtomicLong load(Long seconds) throws Exception {
                        return new AtomicLong(0);
                    }
                });

/**限制每秒10*/
private long limit = 10;

/**
 * 使用Guava的Cache來儲存計數器,過期時間設定為2秒(保證1秒內的計數器是有的),然後我們獲取當前時間戳然後取秒數來作為KEY進行計數統計和限流
 */
public boolean tryAcquire() throws ExecutionException {
    //得到當前秒
    long currentSeconds = System.currentTimeMillis() / 1000;
    if (counter.get(currentSeconds).incrementAndGet() > limit) {
        System.out.println("refuse_request:count="+ counter.get(currentSeconds));
        return true;
    } else {
        System.out.println("pass_request:count=" + counter.get(currentSeconds));
        return false;
    }
}
public static void main(String[] args) {
    GuavaCountRateLimiter guavaCountRateLimiter = new GuavaCountRateLimiter();
    ExecutorService executor = Executors.newCachedThreadPool();
    for(int i=0;i<1000;i++) {
        executor.submit(() -> {
            double random = (new Random()).nextDouble();
            long a = (long)(random * 1000);
            try {
                //睡眠一下
                Thread.sleep(a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                guavaCountRateLimiter.tryAcquire();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
    executor.shutdown();
}
}

輸出結果:

2、漏桶演算法

       漏桶演算法即leaky bucket是一種非常常用的限流演算法,可以用來實現流量整形(Traffic Shaping)和流量控制(Traffic Policing)。貼了一張維基百科上示意圖幫助大家理解:

漏桶演算法的主要概念如下:

  • 一個固定容量的漏桶,按照常量固定速率流出水滴;

  • 如果桶是空的,則不需流出水滴;

  • 可以以任意速率流入水滴到漏桶;

  • 如果流入水滴超出了桶的容量,則流入的水滴溢位了(被丟棄),而漏桶容量是不變的。

程式碼實現

public class LeakRateLimit {
//定義桶的大小
private final ConcurrentLinkedQueue<Integer> container = new ConcurrentLinkedQueue<>();

private final static int BUCKET_LIMIT = 1000;

//消費者 不論多少個執行緒,每秒最大的處理能力是1秒中執行10次
private final RateLimiter consumerRate = RateLimiter.create(10d);

//往桶裡面放資料時,確認沒有超過桶的最大的容量
private Monitor offerMonitor = new Monitor();

//從桶裡消費資料時,桶裡必須存在資料
private Monitor consumerMonitor = new Monitor();

/**
 * 往桶裡面寫資料
 * @param data
 */
public void submit(Integer data) {
    if (offerMonitor.enterIf(offerMonitor.newGuard(() -> container.size() < BUCKET_LIMIT))) {
        try {
            container.offer(data);
            System.out.println(currentThread() + " submit.." + data + " container size is :[" + container.size() + "]");
        } finally {
            offerMonitor.leave();
        }
    } else {
        //這裡時候採用降級策略了。消費速度跟不上產生速度時,而且桶滿了,丟擲異常
        //或者存入MQ DB等後續處理
        System.out.println("The bucket is ful..Pls latter can try...");
        throw new IllegalStateException(currentThread().getName() + "The bucket is ful..Pls latter can try...");
    }
}

/**
 * 從桶裡面消費資料
 * @param consumer
 */
public void takeThenConsumer(Consumer<Integer> consumer) {
    if (consumerMonitor.enterIf(consumerMonitor.newGuard(() -> !container.isEmpty()))) {
        try {
            //不列印時 寫 consumerRate.acquire();
            System.out.println(currentThread() + "  waiting" + consumerRate.acquire());
            Integer data = container.poll();
            //container.peek() 只是去取出來不會刪掉
            consumer.accept(data);
        } finally {
            consumerMonitor.leave();
        }
    } else {
        //當木桶的消費完後,可以消費那些降級存入MQ或者DB裡面的資料
        System.out.println("will consumer Data from MQ...");
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    final LeakRateLimit bucket = new LeakRateLimit();
    final AtomicInteger DATA_CREATOR = new AtomicInteger(0);

    //生產執行緒 10個執行緒 每秒提交 50個數據  1/0.2s*10=50個
    IntStream.range(0, 10).forEach(i -> {
        new Thread(() -> {
            for (; ; ) {
                int data = DATA_CREATOR.incrementAndGet();
                try {
                    bucket.submit(data);
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (Exception e) {
                    //對submit時,如果桶滿了可能會丟擲異常
                    if (e instanceof IllegalStateException) {
                        System.out.println(e.getMessage());
                        //當滿了後,生產執行緒就休眠1分鐘
                        try {
                            TimeUnit.SECONDS.sleep(60);
                        } catch (InterruptedException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }).start();
    });

    //消費執行緒  採用RateLimiter每秒處理10個  綜合的比率是5:1
    IntStream.range(0, 10).forEach(i -> {
        new Thread(
                () -> {
                    for (; ; ) {
                        bucket.takeThenConsumer(x -> {
                            System.out.println(currentThread() + "C.." + x);
                        });
                    }
                }
        ).start();
    });

}
}

輸出結果:

漏桶演算法比較好實現,在單機系統中可以使用佇列來實現,在分散式環境中訊息中介軟體或者Redis都是可選的方案。

3、令牌桶演算法

      令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。 當桶滿時,新新增的令牌被丟棄或拒絕。

       令牌桶演算法是一個存放固定容量令牌(token)的桶,按照固定速率往桶裡新增令牌。令牌桶演算法基本可以用下面的幾個概念來描述:

  • 令牌將按照固定的速率被放入令牌桶中。比如每秒放10個。

  • 桶中最多存放b個令牌,當桶滿時,新新增的令牌被丟棄或拒絕。

  • 當一個n個位元組大小的資料包到達,將從桶中刪除n個令牌,接著資料包被髮送到網路上。

  • 如果桶中的令牌不足n個,則不會刪除令牌,且該資料包將被限流(要麼丟棄,要麼緩衝區等待)。

程式碼實現:

public class TokenLimiter {

/**預設桶大小個數 即最大瞬間流量是64M*/
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

/**一個桶的單位是1位元組*/
private int everyTokenSize = 1;

/**令牌最大數目*/
private int bucketSize;

/**平均流量*/
private int avgFlowRate;

/**佇列來快取桶數量*/
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
        DEFAULT_BUCKET_SIZE);

private ScheduledExecutorService scheduledExecutorService = Executors
        .newSingleThreadScheduledExecutor();

private volatile boolean isStart = false;

private ReentrantLock lock = new ReentrantLock(true);

private static final byte A_CHAR = 'a';

public TokenLimiter() {
}

public TokenLimiter(int bucketSize, int avgFlowRate) {
    this.bucketSize = bucketSize;
    this.avgFlowRate = avgFlowRate;
}

public TokenLimiter(int everyTokenSize, int bucketSize, int avgFlowRate) {
    this.everyTokenSize = everyTokenSize;
    this.bucketSize = bucketSize;
    this.avgFlowRate = avgFlowRate;
}

public void addTokens(Integer tokenNum) {

    // 若是桶已經滿了,就不再家如新的令牌
    for (int i = 0; i < tokenNum; i++) {
        tokenQueue.offer(Byte.valueOf(A_CHAR));
    }
}

public TokenLimiter build() {

    start();
    return this;
}

/**
 * 獲取足夠的令牌個數
 *
 * @return
 */
public boolean getTokens(byte[] dataSize) {
    //傳輸內容大小對應的桶個數
    int needTokenNum = dataSize.length / everyTokenSize + 1;

    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //是否存在足夠的桶數量
        boolean result = needTokenNum <= tokenQueue.size();
        if (!result) {
            return false;
        }

        int tokenCount = 0;
        for (int i = 0; i < needTokenNum; i++) {
            Byte poll = tokenQueue.poll();
            if (poll != null) {
                tokenCount++;
            }
        }

        return tokenCount == needTokenNum;
    } finally {
        lock.unlock();
    }
}

public void start() {

    // 初始化桶佇列大小
    if (bucketSize > 0) {
        tokenQueue = new ArrayBlockingQueue<Byte>(bucketSize);
    }

    // 初始化令牌生產者
    TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
    //定時1s生產令牌
    scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
            TimeUnit.SECONDS);
    isStart = true;

}

public void stop() {
    isStart = false;
    scheduledExecutorService.shutdown();
}

public boolean isStarted() {
    return isStart;
}

class TokenProducer implements Runnable {

    private int          avgFlowRate;
    private TokenLimiter tokenLimiter;

    public TokenProducer(int avgFlowRate, TokenLimiter tokenLimiter) {
        this.avgFlowRate = avgFlowRate;
        this.tokenLimiter = tokenLimiter;
    }

    @Override
    public void run() {
        tokenLimiter.addTokens(avgFlowRate);
    }
}

public static TokenLimiter newBuilder() {
    return new TokenLimiter();
}

public TokenLimiter everyTokenSize(int everyTokenSize) {
    this.everyTokenSize = everyTokenSize;
    return this;
}

public TokenLimiter bucketSize(int bucketSize) {
    this.bucketSize = bucketSize;
    return this;
}

public TokenLimiter avgFlowRate(int avgFlowRate) {
    this.avgFlowRate = avgFlowRate;
    return this;
}

private String stringCopy(String data, int copyNum) {

    StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

    for (int i = 0; i < copyNum; i++) {
        sbuilder.append(data);
    }

    return sbuilder.toString();

}

public static void main(String[] args) throws IOException,
        InterruptedException {
    TokenLimiter tokenLimiter = TokenLimiter.newBuilder().avgFlowRate(512)
            .bucketSize(1024).build();

    BufferedWriter bufferedWriter = new BufferedWriter(
            new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
    String data = "xxxx";// 四個位元組

    ExecutorService executor = Executors.newCachedThreadPool();
    //初始化
    IntStream.range(0, 1000).forEach(i -> {
        executor.submit(() -> {
            //每個執行緒需要一個令牌
            boolean token = tokenLimiter.getTokens("x".getBytes());
            if (token) {
                System.out.println("token pass");
            } else {
                System.out.println("token refuse");
            }
        });
    });
}

}

輸出結果:

google開源工具包guava提供了限流工具類RateLimiter,該類基於“令牌桶演算法”,非常方便使用

使用方法示例:

public class RateLimiterTest {

public static void main(String[] args) {
    //每秒生產兩個令牌
    final RateLimiter rateLimiter = RateLimiter.create(20.0);

    ExecutorService executorService = Executors.newCachedThreadPool();
    IntStream.range(0, 10).forEach(i -> {
        executorService.submit(() -> {
            //隨機休眠
            Random random = new Random();
            int r = random.nextInt(1000);
            try {
                TimeUnit.MICROSECONDS.sleep(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //每個執行緒需要一個令牌
            boolean token = rateLimiter.tryAcquire();
            if (token) {
                System.out.println("token pass");
            } else {
                System.out.println("token refuse");
            }
        });
    });

}
}

輸出結果:

4、漏桶和令牌桶比較

       令牌桶可以在執行時控制和調整資料處理的速率,處理某時的突發流量。放令牌的頻率增加可以提升整體資料處理的速度,而通過每次獲取令牌的個數增加或者放慢令牌的發放速度和降低整體資料處理速度。而漏桶不行,因為它的流出速率是固定的,程式處理速度也是固定的。整體而言,令牌桶演算法更優,但是實現更為複雜一些。