Redisson之Semaphore使用記錄
阿新 • • 發佈:2018-11-19
最近在公司一個專案需要用到限流,就想到concurrent包下的Semaphore(訊號量)。不過實際場景中並不能用,一個原因是專案採用的微服務架構,分散式場景並不適用。第二個場景是限流採用令牌桶演算法,每秒鐘某個介面限流多少,超過丟棄。
後面實現採用的redis分散式限流方案,每分鐘設定一個計數器,計數器一旦達到限流次數,後面的請求被直接擋回。
後面無意中看到有Redisson這個東東,發現能在分散式場景下支援java concurrent下很多工具類(Reentrant Lock、Semaphore、CountDownLatch等 )。
就去學習了一波,試用一下Semaphore功能,就記錄下使用方法。
在springBoot中引入Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.4.3</version>
</dependency>
然後配置RedissonClient的相關資訊
通過配置類的方式
package com.sendi.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.ClassUtils; import io.netty.channel.nio.NioEventLoopGroup; @Configuration public class RedissonConfig { private String address = "redis://localhost:6379"; private int connectionMinimumIdleSize = 10; private int idleConnectionTimeout = 10000; private int pingTimeout = 1000; private int connectTimeout = 10000; private int timeout = 3000; private int retryAttempts = 3; private int retryInterval = 1500; private int reconnectionTimeout = 3000; private int failedAttempts = 3; private String password = null; private int subscriptionsPerConnection = 5; private String clientName = null; private int subscriptionConnectionMinimumIdleSize = 1; private int subscriptionConnectionPoolSize = 50; private int connectionPoolSize = 64; private int database = 0; private boolean dnsMonitoring = false; private int dnsMonitoringInterval = 5000; private int thread; //當前處理核數量 * 2 private String codec = "org.redisson.codec.JsonJacksonCodec"; @Bean(destroyMethod = "shutdown") RedissonClient redisson() throws Exception { Config config = new Config(); config.useSingleServer().setAddress(address) .setConnectionMinimumIdleSize(connectionMinimumIdleSize) .setConnectionPoolSize(connectionPoolSize) .setDatabase(database) .setDnsMonitoring(dnsMonitoring) .setDnsMonitoringInterval(dnsMonitoringInterval) .setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize) .setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize) .setSubscriptionsPerConnection(subscriptionsPerConnection) .setClientName(clientName) .setFailedAttempts(failedAttempts) .setRetryAttempts(retryAttempts) .setRetryInterval(retryInterval) .setReconnectionTimeout(reconnectionTimeout) .setTimeout(timeout) .setConnectTimeout(connectTimeout) .setIdleConnectionTimeout(idleConnectionTimeout) .setPingTimeout(pingTimeout) .setPassword(password); Codec codec = (Codec) ClassUtils.forName("org.redisson.codec.JsonJacksonCodec", ClassUtils.getDefaultClassLoader()).newInstance(); config.setCodec(codec); config.setThreads(thread); config.setEventLoopGroup(new NioEventLoopGroup()); config.setUseLinuxNativeEpoll(false); return Redisson.create(config); } }
配置完後就可以在java類中使用了
//注入
@Autowired
private RedissonClient redissonClient;
//使用
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(1);//設定許可
semaphore.acquire();//獲得一個許可
semaphore.release();//釋放一個許可
使用方法可以在githup上面的api裡面檢視說明 https://github.com/redisson/redisson
和java有一點區別,不過基本使用方法是一樣的,猜測實現原理還是根據redis鍵值對應計數方法實現,就去redis中看了一下
果然在redis中看到了"semaphore"這個鍵
消耗完許可後
檢視原始碼發現其實現方式還是網上的分散式鎖類似
public void acquire(int permits) throws InterruptedException {
if (tryAcquire(permits)) {
return;
}
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {
return;
}
getEntry().getLatch().acquire(permits);
}
} finally {
unsubscribe(future);
}
// get(acquireAsync(permits));
}
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), permits);
}
第一段程式碼是許可獲取的原始碼,可以看出是通過while迴圈實現,直到拿到許可這個方法才返回,繼續往下面程式碼執行
第二段程式碼是通過lua指令碼執行原子操作(訊號量許可獲取和釋放)
總結:發現Redisson底層實現方案和之前在githup上面看到的一些分散式鎖,限流之類的基本大同小異。