1. 程式人生 > >Redisson之Semaphore使用記錄

Redisson之Semaphore使用記錄

最近在公司一個專案需要用到限流,就想到concurrent包下的Semaphore(訊號量)。不過實際場景中並不能用,一個原因是專案採用的微服務架構,分散式場景並不適用。第二個場景是限流採用令牌桶演算法,每秒鐘某個介面限流多少,超過丟棄。

 

後面實現採用的redis分散式限流方案,每分鐘設定一個計數器,計數器一旦達到限流次數,後面的請求被直接擋回。

 

後面無意中看到有Redisson這個東東,發現能在分散式場景下支援java concurrent下很多工具類(Reentrant Lock、Semaphore、CountDownLatch等 )。

 

就去學習了一波,試用一下Semaphore功能,就記錄下使用方法。

在springBoot中引入Redisson

                <dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.4.3</version>
		</dependency>

然後配置RedissonClient的相關資訊

通過配置類的方式

package com.sendi.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;

import io.netty.channel.nio.NioEventLoopGroup;

@Configuration
public class RedissonConfig {
	private String address = "redis://localhost:6379";
    private int connectionMinimumIdleSize = 10;
    private int idleConnectionTimeout = 10000;
    private int pingTimeout = 1000;
    private int connectTimeout = 10000;
    private int timeout = 3000;
    private int retryAttempts = 3;
    private int retryInterval = 1500;
    private int reconnectionTimeout = 3000;
    private int failedAttempts = 3;
    private String password = null;
    private int subscriptionsPerConnection = 5;
    private String clientName = null;
    private int subscriptionConnectionMinimumIdleSize = 1;
    private int subscriptionConnectionPoolSize = 50;
    private int connectionPoolSize = 64;
    private int database = 0;
    private boolean dnsMonitoring = false;
    private int dnsMonitoringInterval = 5000;

    private int thread; //當前處理核數量 * 2

    private String codec = "org.redisson.codec.JsonJacksonCodec";

    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws Exception {
        Config config = new Config();
        config.useSingleServer().setAddress(address)
                .setConnectionMinimumIdleSize(connectionMinimumIdleSize)
                .setConnectionPoolSize(connectionPoolSize)
                .setDatabase(database)
                .setDnsMonitoring(dnsMonitoring)
                .setDnsMonitoringInterval(dnsMonitoringInterval)
                .setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize)
                .setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize)
                .setSubscriptionsPerConnection(subscriptionsPerConnection)
                .setClientName(clientName)
                .setFailedAttempts(failedAttempts)
                .setRetryAttempts(retryAttempts)
                .setRetryInterval(retryInterval)
                .setReconnectionTimeout(reconnectionTimeout)
                .setTimeout(timeout)
                .setConnectTimeout(connectTimeout)
                .setIdleConnectionTimeout(idleConnectionTimeout)
                .setPingTimeout(pingTimeout)
                .setPassword(password);
        Codec codec = (Codec) ClassUtils.forName("org.redisson.codec.JsonJacksonCodec", ClassUtils.getDefaultClassLoader()).newInstance();
        config.setCodec(codec);
        config.setThreads(thread);
        config.setEventLoopGroup(new NioEventLoopGroup());
        config.setUseLinuxNativeEpoll(false);
        return Redisson.create(config);
    }
}

配置完後就可以在java類中使用了

//注入
@Autowired
private RedissonClient redissonClient;




//使用
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
			semaphore.trySetPermits(1);//設定許可
			semaphore.acquire();//獲得一個許可
			semaphore.release();//釋放一個許可

使用方法可以在githup上面的api裡面檢視說明  https://github.com/redisson/redisson

和java有一點區別,不過基本使用方法是一樣的,猜測實現原理還是根據redis鍵值對應計數方法實現,就去redis中看了一下

果然在redis中看到了"semaphore"這個鍵

消耗完許可後

檢視原始碼發現其實現方式還是網上的分散式鎖類似

 public void acquire(int permits) throws InterruptedException {
        if (tryAcquire(permits)) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe();
        commandExecutor.syncSubscription(future);
        try {
            while (true) {
                if (tryAcquire(permits)) {
                    return;
                }

                getEntry().getLatch().acquire(permits);
            }
        } finally {
            unsubscribe(future);
        }
//        get(acquireAsync(permits));
    }
 @Override
    public RFuture<Boolean> tryAcquireAsync(int permits) {
        if (permits < 0) {
            throw new IllegalArgumentException("Permits amount can't be negative");
        }

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "local value = redis.call('get', KEYS[1]); " +
                  "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                      "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                      "return 1; " +
                  "end; " +
                  "return 0;",
                  Collections.<Object>singletonList(getName()), permits);
    }

第一段程式碼是許可獲取的原始碼,可以看出是通過while迴圈實現,直到拿到許可這個方法才返回,繼續往下面程式碼執行

第二段程式碼是通過lua指令碼執行原子操作(訊號量許可獲取和釋放)

 

總結:發現Redisson底層實現方案和之前在githup上面看到的一些分散式鎖,限流之類的基本大同小異。