1. 程式人生 > 程式設計 >關於SpringBoot中Redis執行緒池的有關探討

關於SpringBoot中Redis執行緒池的有關探討

探討起因

最近在寫一個小專案,用redis過期來實現驗證碼的時間限制。因為SpringBoot預設採用 lettuce作為客戶端,引入了commons-pool2依賴之後做了如下配置:

spring:
  redis:
    host: 192.168.56.1
    lettuce:
      pool:
        min-idle: 2
        max-active: 8     #預設
        max-idle: 8       #預設
複製程式碼

本來以為這樣做就行了,然後寫瞭如下程式碼測了下

    @Test
    public void test()
throws InterruptedException
{ int i; CountDownLatch c = new CountDownLatch(5000); long x = System.currentTimeMillis(); for (i = 0; i < 5000; i++) { new Thread(() -> { redisTemplate.opsForValue().set("1","1"); c.countDown(); }).start(); } c.await(); } 複製程式碼

測試期間,實際客戶端最大接入:

# Clients                           # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:3
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼

???,我設定的配置去哪裡了???,於是開始了漫長的探索

嘗試

首先看下預設下是怎樣的。去除了pool的設定,再跑一下。

# Clients                           # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:3 client_recent_max_input_buffer:4 client_recent_max_output_buffer:0 blocked_clients:0 複製程式碼

很好,同樣的結果。說明剛剛的配置根本沒有效果。沒準是lettuce的原因?於是我修改了pom,用jedis測試了一下。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <version>2.1.7.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>lettuce-core</artifactId>
                    <groupId>io.lettuce</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <!--    注意版本和SpringData相容   -->
            <version>2.9.1</version>
        </dependency>
複製程式碼
spring:
  redis:
    host: 192.168.56.1
    jedis:
      pool:
        min-idle: 2
        max-active: 8
        max-idle: 8
複製程式碼

看下結果:

# Clients                           # 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:10
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製程式碼

最大換成15再試試:

# Clients
connected_clients:17
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
複製程式碼

jedis執行緒池正常,那說明是lettuce的配置上除了問題。

探索

首先先來看看LettuceConnectionFactory中的getConnection是怎樣實現的。

    public RedisConnection getConnection() {
        if (this.isClusterAware()) {
            return this.getClusterConnection();
        } else {
            LettuceConnection connection;
            if (this.pool != null) {
                connection = new LettuceConnection(this.getSharedConnection(),this.getTimeout(),(AbstractRedisClient)null,this.pool,this.getDatabase());
            } else {
                connection = new LettuceConnection(this.getSharedConnection(),this.connectionProvider,this.getDatabase());
            }

            connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
            return connection;
        }
    }
複製程式碼

首先,是關於叢集判斷,因為我沒設定任何叢集相關,所以直接來到else。此處的pool 並不是我們設定的pool。它對應的是如下方法,且只有這一處對它進行賦值:

	/**
	 * @param pool
	 * @deprecated since 2.0,use pooling via {@link LettucePoolingClientConfiguration}.
	 */
	@Deprecated
	public LettuceConnectionFactory(LettucePool pool) {

		this(new MutableLettuceClientConfiguration());
		this.pool = pool;
	}
複製程式碼

LettuceConnectionConfiguration中的注入繼續追下去可以看到並沒有呼叫這個過時方法:

	@Bean
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)
			throws UnknownHostException {
		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,this.properties.getLettuce().getPool());
		return createLettuceConnectionFactory(clientConfig);
	}

	//  之後來到
	public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig,LettuceClientConfiguration clientConfig) {

		this(clientConfig);

		Assert.notNull(standaloneConfig,"RedisStandaloneConfiguration must not be null!");

		this.standaloneConfig = standaloneConfig;
		this.configuration = this.standaloneConfig;
	}
複製程式碼

所以,上面判斷時此的pool恆為null,進入else。在例項化之前,首先會根據getSharedConnection獲取一個StatefulRedisConnection連線:

	protected StatefulRedisConnection<byte[],byte[]> getSharedConnection() {
		return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
	}

	private SharedConnection<byte[]> getOrCreateSharedConnection() {

		synchronized (this.connectionMonitor) {

			if (this.connection == null) {
				this.connection = new SharedConnection<>(connectionProvider);
			}

			return this.connection;
		}
	}
複製程式碼

再接著向下則進入LettuceConnection建構函式:

	LettuceConnection(@Nullable StatefulConnection<byte[],byte[]> sharedConnection,LettuceConnectionProvider connectionProvider,long timeout,int defaultDbIndex) {

		Assert.notNull(connectionProvider,"LettuceConnectionProvider must not be null.");

		this.asyncSharedConn = sharedConnection;
		this.connectionProvider = connectionProvider;
		this.timeout = timeout;
		this.defaultDbIndex = defaultDbIndex;
		this.dbIndex = this.defaultDbIndex;
	}
複製程式碼

此時的this.asyncSharedConn = sharedConnection;,再接著向下,我們會進入如下方法:

	protected RedisClusterCommands<byte[],byte[]> getConnection() {
		//  在事務未開啟時此處為false預設值
		if (isQueueing()) {
			return getDedicatedConnection();
		}
		if (asyncSharedConn != null) {

			if (asyncSharedConn instanceof StatefulRedisConnection) {
				return ((StatefulRedisConnection<byte[],byte[]>) asyncSharedConn).sync();
			}
			if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
				return ((StatefulRedisClusterConnection<byte[],byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
複製程式碼

這裡的asyncSharedConn即為我們上面的共享例項連線。

分析一下,在預設情況下:

private boolean shareNativeConnection = true;
複製程式碼

new LettuceConnection時,經過StatefulRedisConnection()getOrCreateSharedConnection()後傳入了同一個共享連線。所以在此時即使配置了執行緒池,在執行時有且只有一個共享例項提供操作。 那我們驗證一下,我嘗試人為改變一下shareNativeConnection

@Configuration
public class Config {
    @Autowired
    public void setLettuceConnectionFactory(LettuceConnectionFactory lettuceConnectionFactory){
        lettuceConnectionFactory.setShareNativeConnection(false);
    }
}
複製程式碼

還是按下面的配置及測試:

spring:
  redis:
    host: 192.168.56.1
    lettuce:
      pool:
        min-idle: 2
複製程式碼
    @Test
    public void test() throws InterruptedException {
        int i ;
        CountDownLatch c = new CountDownLatch(5000);
        for (i = 1; i <= 5000; i++) {
            new Thread(() -> {
                System.out.println(redisTemplate.execute(RedisConnection::ping));
                c.countDown();
            }).start();
        }
        c.await();
    }
複製程式碼

之後結果:

# Clients
connected_clients:10                    # 減去一個redis-cli連線,一個為之前的專案連線
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼

這個結果10中的8個為pool中的預設max-active屬性,但這不意味不設定pool屬性時仍為預設值(下面會說)。再將max-active改為10,結果也是相符:

# Clients								# 減去一個redis-cli連線,一個為之前的專案連線
connected_clients:12
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼

那改變之後和之前有何區別呢?修改後,首先在new LettuceConnection(this.getSharedConnection(),this.connectionProvider,this.getTimeout(),this.getDatabase());時,傳入的共享例項為null,接著也是進入如下方法:

	protected RedisClusterCommands<byte[],byte[]>) asyncSharedConn).sync();
			}
		}
		return getDedicatedConnection();
	}
複製程式碼

再接著則會進入getDedicatedConnection()

	RedisClusterCommands<byte[],byte[]> getDedicatedConnection() {

		if (asyncDedicatedConn == null) {

			asyncDedicatedConn = doGetAsyncDedicatedConnection();

			if (asyncDedicatedConn instanceof StatefulRedisConnection) {
				((StatefulRedisConnection<byte[],byte[]>) asyncDedicatedConn).sync().select(dbIndex);
			}
		}

		if (asyncDedicatedConn instanceof StatefulRedisConnection) {
			return ((StatefulRedisConnection<byte[],byte[]>) asyncDedicatedConn).sync();
		}
		if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
			return ((StatefulRedisClusterConnection<byte[],byte[]>) asyncDedicatedConn).sync();
		}

		throw new IllegalStateException(
				String.format("%s is not a supported connection type.",asyncDedicatedConn.getClass().getName()));
	}
複製程式碼

因為首次進入,asyncDedicatedConn肯定為null,繼續向下doGetAsyncDedicatedConnection();

	protected StatefulConnection<byte[],byte[]> doGetAsyncDedicatedConnection() {
		return connectionProvider.getConnection(StatefulConnection.class);
	}
複製程式碼

此時可以看到,連線的提供則會由LettuceConnectionFactory中傳入的connectionProvider負責。connectionProviderafterPropertiesSet()進行初始化:

	public void afterPropertiesSet() {

		this.client = createClient();

		this.connectionProvider = createConnectionProvider(client,LettuceConnection.CODEC);
		this.reactiveConnectionProvider = createConnectionProvider(client,LettuceReactiveRedisConnection.CODEC);

		if (isClusterAware()) {
			this.clusterCommandExecutor = new ClusterCommandExecutor(
					new LettuceClusterTopologyProvider((RedisClusterClient) client),new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),EXCEPTION_TRANSLATION);
		}
    
    //    建立方法    
	private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client,RedisCodec<?,?> codec) {

		LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client,codec);

		if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
			return new LettucePoolingConnectionProvider(connectionProvider,(LettucePoolingClientConfiguration) this.clientConfiguration);
		}

		return connectionProvider;
	}
複製程式碼

可以看到connectionProvider與上面注入的this.properties.getLettuce().getPool()的配置息息相關。首先在建立時都為doCreateConnectionProvider:

protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client,?> codec) {

		ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null);

		if (isStaticMasterReplicaAware()) {

			List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() //
					.map(it -> createRedisURIAndApplySettings(it.getHostName(),it.getPort())) //
					.peek(it -> it.setDatabase(getDatabase())) //
					.collect(Collectors.toList());

			return new StaticMasterReplicaConnectionProvider((RedisClient) client,codec,nodes,readFrom);
		}

		if (isClusterAware()) {
			return new ClusterConnectionProvider((RedisClusterClient) client,readFrom);
		}

		return new StandaloneConnectionProvider((RedisClient) client,readFrom);
	}
複製程式碼

pool在配置中為null,那connectionProvider則為StandaloneConnectionProvider,否則則為LettucePoolingConnectionProvider。而在getConnection的實現上,兩者也存在巨大區別。

//        LettucePoolingConnectionProvider   
    public <T extends StatefulConnection<?,?>> T getConnection(Class<T> connectionType) {

		GenericObjectPool<StatefulConnection<?,?>> pool = pools.computeIfAbsent(connectionType,poolType -> {
			return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),poolConfig,false);
		});

		try {

			StatefulConnection<?,?> connection = pool.borrowObject();

			poolRef.put(connection,pool);

			return connectionType.cast(connection);
		} catch (Exception e) {
			throw new PoolException("Could not get a resource from the pool",e);
		}
	}

//         StandaloneConnectionProvider
    public <T extends StatefulConnection<?,?>> T getConnection(Class<T> connectionType) {

		if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
			return connectionType.cast(client.connectSentinel());
		}

		if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
			return connectionType.cast(client.connectPubSub(codec));
		}

		if (StatefulConnection.class.isAssignableFrom(connectionType)) {

			return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(),it))
					.orElseGet(() -> client.connect(codec)));
		}

		throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
	}
複製程式碼

至此便可看到:如果我們將shareNativeConnection設定為false之後便會有兩種情形:

  • 沒有設定pool:StandaloneConnectionProvider每次請求都會建立一個連線
  • 設定了pool:LettucePoolingConnectionProvider則是以執行緒池的方式建立連線。

那我們驗證一下,如果我將shareNativeConnection設定為false同時不在application.yml對執行緒池進行任何設定,結果如下:

# Clients
connected_clients:591
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:771
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:885
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:974
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
127.0.0.1:6379> info clients
# Clients
connected_clients:1022
client_recent_max_input_buffer:4
client_recent_max_output_buffer:0
blocked_clients:0
複製程式碼

這連線數是直接放飛自我了。。。。。而且伴隨著報錯的發生。不過設定pool後,由執行緒池進行管理連線則可得到人為控制。

總結

通過探索,我發現Lettuce的poolshareNativeConnection息息相關,如有不對的地方,望能指證。