1. 程式人生 > >guava 限流的兩種方式

guava 限流的兩種方式

java中對於生產者消費者模型,或者小米手機營銷  1分鐘賣多少臺手機等都存在限流的思想在裡面。
關於限流 目前存在兩大類,從執行緒個數(jdk1.5 Semaphore)和RateLimiter速率(guava)
Semaphore:從執行緒個數限流
RateLimiter:從速率限流  目前常見的演算法是漏桶演算法和令牌演算法
令牌桶演算法。相比漏桶演算法而言區別在於,令牌桶是會去勻速的生成令牌,拿到令牌才能夠進行處理,類似於勻速往桶裡放令牌
漏桶演算法是:生產者消費者模型,生產者往木桶裡生產資料,消費者按照定義的速度去消費資料
應用場景:
漏桶演算法:必須讀寫分流的情況下,限制讀取的速度
令牌桶演算法:必須讀寫分離的情況下,限制寫的速率或者小米手機飢餓營銷的場景  只賣1分種搶購1000
實現的方法都是一樣。RateLimiter來實現
對於多執行緒問題查詢時,很多時候可能使用的類都是原子性的,但是由於程式碼邏輯的問題,也可能發生執行緒安全問題
一、問題描述  
  某天A君突然發現自己的介面請求量突然漲到之前的10倍,沒多久該介面幾乎不可使用,並引發連鎖反應導致整個系統崩潰。如何應對這種情況呢?生活給了我們答案:比如老式電閘都安裝了保險絲,一旦有人使用超大功率的裝置,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的介面也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以採取拒絕或者引流等機制。 
二、常用的限流演算法
      常用的限流演算法有兩種:漏桶演算法和令牌桶演算法,這篇博文介紹得比較清晰(過載保護演算法淺析)。
      漏桶演算法思路很簡單,請求先進入到漏桶裡,漏桶以一定的速度出水,當水請求過大會直接溢位,可以看出漏桶演算法能強行限制資料的傳輸速率。

圖1 漏桶演算法示意圖
      對於很多應用場景來說,除了要求能夠限制資料的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶演算法可能就不合適了,令牌桶演算法更為適合。如圖2所示,令牌桶演算法的原理是系統會以一個恆定的速度往桶裡放入令牌,而如果請求需要被處理,則需要先從桶裡獲取一個令牌,當桶裡沒有令牌可取時,則拒絕服務。

圖2 令牌桶演算法示意圖
三、限流工具類RateLimiter
   google開源工具包guava提供了限流工具類RateLimiter,該類基於“令牌桶演算法”,非常方便使用。該類的介面描述請參考:RateLimiter介面描述,具體的使用請參考:RateLimiter使用實踐。
1.關於RateLimter和Semphore簡單用法
package concurrent;
 
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.*;
import java.util.stream.IntStream;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 * 關於限流 目前存在兩大類,從執行緒個數(jdk1.5 Semaphore)和RateLimiter速率(guava)
 * Semaphore:從執行緒個數限流
 * RateLimiter:從速率限流  目前常見的演算法是漏桶演算法和令牌演算法,下面會具體介紹
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-15 22:44
 **/
public class RateLimiterExample {
 
   //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  從速度來限流,從每秒中能夠執行的次數來
    private final static RateLimiter limiter=RateLimiter.create(0.5d);
 
 
    //同時只能有三個執行緒工作 Java1.5  從同時處理的執行緒個數來限流
    private final static Semaphore sem=new Semaphore(3);
    private static void testSemaphore(){
        try {
            sem.acquire();
            System.out.println(currentThread().getName()+" is doing work...");
            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            sem.release();
            System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
        }
    }
 
    public static void runTestSemaphore(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 這種寫法是建立一個執行緒
            service.submit(RateLimiterExample::testSemaphore);
        });
    }
 
    /**
     * Guava的RateLimiter
     */
    private static void testLimiter(){
        System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
    }
 
    //Guava的RateLimiter
    public static void runTestLimiter(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        IntStream.range(0,10).forEach((i)->{
            //RateLimiterExample::testLimiter 這種寫法是建立一個執行緒
            service.submit(RateLimiterExample::testLimiter);
        });
    }
 
 
 
    public static void main(String[] args) {
        IntStream.range(0,10).forEach((a)-> System.out.println(a));//從0-9
        //runTestLimiter();
        runTestSemaphore();
    }
}


2.實現漏桶演算法

package concurrent.BucketAl;
 
import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 22:42
 * 實現漏桶演算法 實現多執行緒生產者消費者模型 限流
 **/
public class Bucket {
    //定義桶的大小
    private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();
 
    private final static int  BUCKET_LIMIT=1000;
 
    //消費者 不論多少個執行緒,每秒最大的處理能力是1秒中執行10次
    private final RateLimiter consumerRate=RateLimiter.create(10d);
 
    //往桶裡面放資料時,確認沒有超過桶的最大的容量
    private Monitor offerMonitor=new Monitor();
 
    //從桶裡消費資料時,桶裡必須存在資料
    private Monitor consumerMonitor=new Monitor();
 
 
    /**
     * 往桶裡面寫資料
     * @param data
     */
    public void submit(Integer data){
        if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
            try {
                container.offer(data);
                System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
            } finally {
                offerMonitor.leave();
            }
        }else {
            //這裡時候採用降級策略了。消費速度跟不上產生速度時,而且桶滿了,丟擲異常
            //或者存入MQ DB等後續處理
            throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
        }
    }
 
 
    /**
     * 從桶裡面消費資料
     * @param consumer
     */
    public void takeThenConsumer(Consumer<Integer> consumer){
        if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
            try {
                //不列印時 寫 consumerRate.acquire();
                System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
                Integer data = container.poll();
                //container.peek() 只是去取出來不會刪掉
                consumer.accept(data);
            }finally {
                consumerMonitor.leave();
            }
        }else {
            //當木桶的消費完後,可以消費那些降級存入MQ或者DB裡面的資料
            System.out.println("will consumer Data from MQ...");
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
}

2.1 漏桶演算法測試類

package concurrent.BucketAl;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
 
import static java.lang.Thread.currentThread;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-20 23:11
 * 漏桶演算法測試
 * 實現漏桶演算法 實現多執行緒生產者消費者模型 限流
 **/
public class BuckerTest {
 
    public static void main(String[] args) {
        final Bucket bucket = new Bucket();
        final AtomicInteger DATA_CREATOR = new AtomicInteger(0);
 
        //生產執行緒 10個執行緒 每秒提交 50個數據  1/0.2s*10=50個
        IntStream.range(0, 10).forEach(i -> {
            new Thread(() -> {
                for (; ; ) {
                    int data = DATA_CREATOR.incrementAndGet();
                    try {
                        bucket.submit(data);
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (Exception e) {
                        //對submit時,如果桶滿了可能會丟擲異常
                        if (e instanceof IllegalStateException) {
                            System.out.println(e.getMessage());
                            //當滿了後,生產執行緒就休眠1分鐘
                            try {
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
                }
            }).start();
        });
 
 
        //消費執行緒  採用RateLimiter每秒處理10個  綜合的比率是5:1
        IntStream.range(0, 10).forEach(i -> {
            new Thread(
                    () -> {
                        for (; ; ) {
                            bucket.takeThenConsumer(x -> {
                                System.out.println(currentThread()+"C.." + x);
                            });
                        }
                    }
            ).start();
        });
 
    }
}

3.令牌桶演算法

package concurrent.TokenBucket;
 
import com.google.common.util.concurrent.RateLimiter;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:18
 * 令牌桶演算法。相比漏桶演算法而言區別在於,令牌桶是會去勻速的生成令牌,拿到令牌才能夠進行處理,類似於勻速往桶裡放令牌
 * 漏桶演算法是:生產者消費者模型,生產者往木桶裡生產資料,消費者按照定義的速度去消費資料
 *
 * 應用場景:
 * 漏桶演算法:必須讀寫分流的情況下,限制讀取的速度
 * 令牌桶演算法:必須讀寫分離的情況下,限制寫的速率或者小米手機飢餓營銷的場景  只賣1分種搶購1000
 *
 * 實現的方法都是一樣。RateLimiter來實現
 * 對於多執行緒問題查詢時,很多時候可能使用的類都是原子性的,但是由於程式碼邏輯的問題,也可能發生執行緒安全問題
 **/
public class TokenBuck {
 
    //可以使用 AtomicInteger+容量  可以不用Queue實現
   private AtomicInteger phoneNumbers=new AtomicInteger(0);
   private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能執行五次
   //預設銷售500臺
   private final static int DEFALUT_LIMIT=500;
   private final int saleLimit;
 
    public TokenBuck(int saleLimit) {
        this.saleLimit = saleLimit;
    }
 
    public TokenBuck() {
        this(DEFALUT_LIMIT);
    }
 
    public int buy(){
        //這個check 必須放在success裡面做判斷,不然會產生執行緒安全問題(業務引起)
        //原因當phoneNumbers=99 時 同時存在三個執行緒進來。雖然phoneNumbers原子性,但是也會發生。如果必須寫在這裡,在success
        //裡面也需要加上double check
       /* if (phoneNumbers.get()>=saleLimit){
            throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
        }*/
 
        //目前設定超時時間,10秒內沒有搶到就丟擲異常
        //這裡的TimeOut*Ratelimiter=總數  這裡的超時就是讓別人搶幾秒,所以設定總數也可以由這裡的超時和RateLimiter來計算
         boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
         if (success){
             if (phoneNumbers.get()>=saleLimit){
                 throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
             }
             int phoneNo = phoneNumbers.getAndIncrement();
             System.out.println(currentThread()+" user has get :["+phoneNo+"]");
             return phoneNo;
         }else {
             //超時後 同一時間,很大的流量來強時,超時快速失敗。
             throw new RuntimeException(currentThread()+"has timeOut can try again...");
         }
 
    }
}

3.1 令牌桶演算法的測試類

package concurrent.TokenBucket;
 
import java.util.stream.IntStream;
 
/**
 * ${DESCRIPTION}
 *
 * @author mengxp
 * @version 1.0
 * @create 2018-01-21 0:40
 **/
public class TokenBuckTest {
    public static void main(String[] args) {
        final TokenBuck tokenBuck=new TokenBuck(200);
 
 
        IntStream.range(0,300).forEach(i->{
            //目前測試時,讓一個執行緒搶一次,不用迴圈搶
            //tokenBuck::buy 這種方式 產生一個Runnable
            new Thread(tokenBuck::buy).start();
        });
    }
}

--------------------- 本文來自 大資料孟小鵬 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/mengxpfighting/article/details/79117934?utm_source=copy

Monitor類是作為ReentrantLock的一個替代,程式碼中使用 Monitor比使用ReentrantLock更不易出錯,可讀性也更強,並且也沒有顯著的效能損失,使用Monitor甚至有潛在的效能得到優化。

public abstract static class Guard:一個標識執行緒是否等待的布林條件,Guard類總是與單一的Monitor相關聯,Monitor可以在任意時間從任意佔用Monitor的執行緒檢查Guard,這樣程式碼的編寫將不在關心Guard是否被檢查的頻率。

public abstract boolean isSatisfied():Guard內部提供的抽象方法,isSatisfied(),當被關聯的Monitor被佔用時,Guard的此方法會被呼叫,該方法的實現必須取決於被關聯Monitor保護的狀態,並且狀態不可修改。

Monitor有幾個常用的方法

  • enter():進入到當前Monitor,無限期阻塞,等待鎖。(這裡沒有Guard)
  • enter(long time, TimeUnit unit):進入到當前Monitor,最多阻塞給定的時間,返回是否進入Monitor。
  • tryEnter():如果可以的話立即進入Monitor,不阻塞,返回是否進入Monitor。
  • enterWhen(Guard guard):進入當前Monitor,等待Guard的isSatisfied()為true後,繼續往下執行 ,但可能會被打斷; 為false,會阻塞。
  • enterIf(Guard guard):如果Guard的isSatisfied()為true,進入當前Monitor。等待獲得鎖(這裡會等待獲取鎖),不需要等待Guard satisfied。
  • tryEnterIf(Guard guard):如果Guard的isSatisfied()為true並且可以的話立即進入Monitor,不等待獲取鎖(這裡不等待獲取鎖),也不等待Guard satisfied。
  • 感覺  enterWhen enterIf的用的區別就是前者無返回值,後者返回Boolean值。
  • newGuard(Boolean b)為{@程式碼}監視器建立一個新的{@連結守護}