關於SpringBoot中Redis執行緒池的有關探討
探討起因
最近在寫一個小專案,用redis過期來實現驗證碼的時間限制。因為SpringBoot預設採用
lettuce作為客戶端,引入了commons-pool2
依賴之後做了如下配置:
spring:
redis:
host: 192.168.56.1
lettuce:
pool:
min-idle: 2
max-active: 8 #預設
max-idle: 8 #預設
複製程式碼
本來以為這樣做就行了,然後寫瞭如下程式碼測了下
@Test
public void test() throws InterruptedException {
int i;
CountDownLatch c = new CountDownLatch(5000);
long x = System.currentTimeMillis();
for (i = 0; i < 5000; i++) {
new Thread(() -> {
redisTemplate.opsForValue().set("1","1");
c.countDown();
}).start();
}
c.await();
}
複製程式碼
測試期間,實際客戶端最大接入:
# Clients # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼
???,我設定的配置去哪裡了???,於是開始了漫長的探索
嘗試
首先看下預設下是怎樣的。去除了pool的設定,再跑一下。
# Clients # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼
很好,同樣的結果。說明剛剛的配置根本沒有效果。沒準是lettuce的原因?於是我修改了pom,用jedis測試了一下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.7.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>lettuce-core</artifactId>
<groupId>io.lettuce</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<!-- 注意版本和SpringData相容 -->
<version>2.9.1</version>
</dependency>
複製程式碼
spring:
redis:
host: 192.168.56.1
jedis:
pool:
min-idle: 2
max-active: 8
max-idle: 8
複製程式碼
看下結果:
# Clients # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:10
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製程式碼
最大換成15再試試:
# Clients
connected_clients:17
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製程式碼
jedis執行緒池正常,那說明是lettuce的配置上除了問題。
探索
首先先來看看LettuceConnectionFactory
中的getConnection
是怎樣實現的。
public RedisConnection getConnection() {
if (this.isClusterAware()) {
return this.getClusterConnection();
} else {
LettuceConnection connection;
if (this.pool != null) {
connection = new LettuceConnection(this.getSharedConnection(),this.getTimeout(),(AbstractRedisClient)null,this.pool,this.getDatabase());
} else {
connection = new LettuceConnection(this.getSharedConnection(),this.connectionProvider,this.getDatabase());
}
connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
return connection;
}
}
複製程式碼
首先,是關於叢集判斷,因為我沒設定任何叢集相關,所以直接來到else
。此處的pool
並不是我們設定的pool
。它對應的是如下方法,且只有這一處對它進行賦值:
/**
* @param pool
* @deprecated since 2.0,use pooling via {@link LettucePoolingClientConfiguration}.
*/
@Deprecated
public LettuceConnectionFactory(LettucePool pool) {
this(new MutableLettuceClientConfiguration());
this.pool = pool;
}
複製程式碼
從LettuceConnectionConfiguration
中的注入繼續追下去可以看到並沒有呼叫這個過時方法:
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)
throws UnknownHostException {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,this.properties.getLettuce().getPool());
return createLettuceConnectionFactory(clientConfig);
}
// 之後來到
public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig,LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(standaloneConfig,"RedisStandaloneConfiguration must not be null!");
this.standaloneConfig = standaloneConfig;
this.configuration = this.standaloneConfig;
}
複製程式碼
所以,上面判斷時此的pool
恆為null
,進入else
。在例項化之前,首先會根據getSharedConnection
獲取一個StatefulRedisConnection
連線:
protected StatefulRedisConnection<byte[],byte[]> getSharedConnection() {
return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
}
private SharedConnection<byte[]> getOrCreateSharedConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
this.connection = new SharedConnection<>(connectionProvider);
}
return this.connection;
}
}
複製程式碼
再接著向下則進入LettuceConnection
建構函式:
LettuceConnection(@Nullable StatefulConnection<byte[],byte[]> sharedConnection,LettuceConnectionProvider connectionProvider,long timeout,int defaultDbIndex) {
Assert.notNull(connectionProvider,"LettuceConnectionProvider must not be null.");
this.asyncSharedConn = sharedConnection;
this.connectionProvider = connectionProvider;
this.timeout = timeout;
this.defaultDbIndex = defaultDbIndex;
this.dbIndex = this.defaultDbIndex;
}
複製程式碼
此時的this.asyncSharedConn = sharedConnection;
,再接著向下,我們會進入如下方法:
protected RedisClusterCommands<byte[],byte[]> getConnection() {
// 在事務未開啟時此處為false預設值
if (isQueueing()) {
return getDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[],byte[]>) asyncSharedConn).sync();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[],byte[]>) asyncSharedConn).sync();
}
}
return getDedicatedConnection();
}
複製程式碼
這裡的asyncSharedConn
即為我們上面的共享例項連線。
分析一下,在預設情況下:
private boolean shareNativeConnection = true;
複製程式碼
當new LettuceConnection
時,經過StatefulRedisConnection()
、getOrCreateSharedConnection()
後傳入了同一個共享連線。所以在此時即使配置了執行緒池,在執行時有且只有一個共享例項提供操作。
那我們驗證一下,我嘗試人為改變一下shareNativeConnection
:
@Configuration
public class Config {
@Autowired
public void setLettuceConnectionFactory(LettuceConnectionFactory lettuceConnectionFactory){
lettuceConnectionFactory.setShareNativeConnection(false);
}
}
複製程式碼
還是按下面的配置及測試:
spring:
redis:
host: 192.168.56.1
lettuce:
pool:
min-idle: 2
複製程式碼
@Test
public void test() throws InterruptedException {
int i ;
CountDownLatch c = new CountDownLatch(5000);
for (i = 1; i <= 5000; i++) {
new Thread(() -> {
System.out.println(redisTemplate.execute(RedisConnection::ping));
c.countDown();
}).start();
}
c.await();
}
複製程式碼
之後結果:
# Clients
connected_clients:10 # 減去一個redis-cli連線,一個為之前的專案連線
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼
這個結果10中的8個為pool
中的預設max-active
屬性,但這不意味不設定pool
屬性時仍為預設值(下面會說)。再將max-active
改為10,結果也是相符:
# Clients # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:12
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼
那改變之後和之前有何區別呢?修改後,首先在new LettuceConnection(this.getSharedConnection(),this.connectionProvider,this.getTimeout(),this.getDatabase());
時,傳入的共享例項為null
,接著也是進入如下方法:
protected RedisClusterCommands<byte[],byte[]>) asyncSharedConn).sync();
}
}
return getDedicatedConnection();
}
複製程式碼
再接著則會進入getDedicatedConnection()
RedisClusterCommands<byte[],byte[]> getDedicatedConnection() {
if (asyncDedicatedConn == null) {
asyncDedicatedConn = doGetAsyncDedicatedConnection();
if (asyncDedicatedConn instanceof StatefulRedisConnection) {
((StatefulRedisConnection<byte[],byte[]>) asyncDedicatedConn).sync().select(dbIndex);
}
}
if (asyncDedicatedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[],byte[]>) asyncDedicatedConn).sync();
}
if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[],byte[]>) asyncDedicatedConn).sync();
}
throw new IllegalStateException(
String.format("%s is not a supported connection type.",asyncDedicatedConn.getClass().getName()));
}
複製程式碼
因為首次進入,asyncDedicatedConn
肯定為null
,繼續向下doGetAsyncDedicatedConnection();
protected StatefulConnection<byte[],byte[]> doGetAsyncDedicatedConnection() {
return connectionProvider.getConnection(StatefulConnection.class);
}
複製程式碼
此時可以看到,連線的提供則會由LettuceConnectionFactory
中傳入的connectionProvider
負責。connectionProvider
由afterPropertiesSet()
進行初始化:
public void afterPropertiesSet() {
this.client = createClient();
this.connectionProvider = createConnectionProvider(client,LettuceConnection.CODEC);
this.reactiveConnectionProvider = createConnectionProvider(client,LettuceReactiveRedisConnection.CODEC);
if (isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),EXCEPTION_TRANSLATION);
}
// 建立方法
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client,RedisCodec<?,?> codec) {
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client,codec);
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
return new LettucePoolingConnectionProvider(connectionProvider,(LettucePoolingClientConfiguration) this.clientConfiguration);
}
return connectionProvider;
}
複製程式碼
可以看到connectionProvider
與上面注入的this.properties.getLettuce().getPool()
的配置息息相關。首先在建立時都為doCreateConnectionProvider
:
protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client,?> codec) {
ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null);
if (isStaticMasterReplicaAware()) {
List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() //
.map(it -> createRedisURIAndApplySettings(it.getHostName(),it.getPort())) //
.peek(it -> it.setDatabase(getDatabase())) //
.collect(Collectors.toList());
return new StaticMasterReplicaConnectionProvider((RedisClient) client,codec,nodes,readFrom);
}
if (isClusterAware()) {
return new ClusterConnectionProvider((RedisClusterClient) client,readFrom);
}
return new StandaloneConnectionProvider((RedisClient) client,readFrom);
}
複製程式碼
若pool
在配置中為null
,那connectionProvider
則為StandaloneConnectionProvider
,否則則為LettucePoolingConnectionProvider
。而在getConnection
的實現上,兩者也存在巨大區別。
// LettucePoolingConnectionProvider
public <T extends StatefulConnection<?,?>> T getConnection(Class<T> connectionType) {
GenericObjectPool<StatefulConnection<?,?>> pool = pools.computeIfAbsent(connectionType,poolType -> {
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),poolConfig,false);
});
try {
StatefulConnection<?,?> connection = pool.borrowObject();
poolRef.put(connection,pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool",e);
}
}
// StandaloneConnectionProvider
public <T extends StatefulConnection<?,?>> T getConnection(Class<T> connectionType) {
if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
return connectionType.cast(client.connectSentinel());
}
if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
return connectionType.cast(client.connectPubSub(codec));
}
if (StatefulConnection.class.isAssignableFrom(connectionType)) {
return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(),it))
.orElseGet(() -> client.connect(codec)));
}
throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}
複製程式碼
至此便可看到:如果我們將shareNativeConnection
設定為false
之後便會有兩種情形:
- 沒有設定pool:
StandaloneConnectionProvider
每次請求都會建立一個連線 - 設定了pool:
LettucePoolingConnectionProvider
則是以執行緒池的方式建立連線。
那我們驗證一下,如果我將shareNativeConnection
設定為false
同時不在application.yml
對執行緒池進行任何設定,結果如下:
# Clients
connected_clients:591
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:771
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:885
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:974
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼
這連線數是直接放飛自我了。。。。。而且伴隨著報錯的發生。不過設定pool
後,由執行緒池進行管理連線則可得到人為控制。
總結
通過探索,我發現Lettuce的pool
與shareNativeConnection
息息相關,如有不對的地方,望能指證。