聊聊spring-data-redis的連線池的校驗
阿新 • • 發佈:2018-12-09
序
本文主要研究一下spring-data-redis的連線池的校驗
LettucePoolingConnectionProvider
spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean { private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class); private final LettuceConnectionProvider connectionProvider; private final GenericObjectPoolConfig poolConfig; private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32); private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32); LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { Assert.notNull(connectionProvider, "ConnectionProvider must not be null!"); Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); this.connectionProvider = connectionProvider; this.poolConfig = clientConfiguration.getPoolConfig(); } public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) { GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> { return ConnectionPoolSupport.createGenericObjectPool(() -> { return this.connectionProvider.getConnection(connectionType); }, this.poolConfig, false); }); try { StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject(); this.poolRef.put(connection, pool); return (StatefulConnection)connectionType.cast(connection); } catch (Exception var4) { throw new PoolException("Could not get a resource from the pool", var4); } } public AbstractRedisClient getRedisClient() { if (this.connectionProvider instanceof RedisClientProvider) { return ((RedisClientProvider)this.connectionProvider).getRedisClient(); } else { throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName())); } } public void release(StatefulConnection<?, ?> connection) { GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } else { pool.returnObject(connection); } } public void destroy() throws Exception { if (!this.poolRef.isEmpty()) { log.warn("LettucePoolingConnectionProvider contains unreleased connections"); this.poolRef.forEach((connection, pool) -> { pool.returnObject(connection); }); this.poolRef.clear(); } this.pools.forEach((type, pool) -> { pool.close(); }); this.pools.clear(); } }
- 這裡呼叫ConnectionPoolSupport.createGenericObjectPool來建立連線池
ConnectionPoolSupport.createGenericObjectPool
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool( Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>(); GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) { @Override public T borrowObject() throws Exception { return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
- 這裡使用了RedisPooledObjectFactory
ConnectionPoolSupport.RedisPooledObjectFactory
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java
private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> { private final Supplier<T> connectionSupplier; RedisPooledObjectFactory(Supplier<T> connectionSupplier) { this.connectionSupplier = connectionSupplier; } @Override public T create() throws Exception { return connectionSupplier.get(); } @Override public void destroyObject(PooledObject<T> p) throws Exception { p.getObject().close(); } @Override public PooledObject<T> wrap(T obj) { return new DefaultPooledObject<>(obj); } @Override public boolean validateObject(PooledObject<T> p) { return p.getObject().isOpen(); } }
- 這裡繼承了BasePooledObjectFactory,重寫了validate等方法,這裡validate是通過isOpen來判斷
RedisChannelHandler.isOpen
lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java
public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);
private Duration timeout;
private CloseEvents closeEvents = new CloseEvents();
private final RedisChannelWriter channelWriter;
private final boolean debugEnabled = logger.isDebugEnabled();
private volatile boolean closed;
private volatile boolean active = true;
private volatile ClientOptions clientOptions;
//......
/**
* Notification when the connection becomes active (connected).
*/
public void activated() {
active = true;
closed = false;
}
/**
* Notification when the connection becomes inactive (disconnected).
*/
public void deactivated() {
active = false;
}
/**
*
* @return true if the connection is active and not closed.
*/
public boolean isOpen() {
return active;
}
@Override
public synchronized void close() {
if (debugEnabled) {
logger.debug("close()");
}
if (closed) {
logger.warn("Connection is already closed");
return;
}
if (!closed) {
active = false;
closed = true;
channelWriter.close();
closeEvents.fireEventClosed(this);
closeEvents = new CloseEvents();
}
}
}
- isOpen是通過active欄位來判斷的,而active在deactivated或者close的時候變為false,初始化以及在activated的時候變為true
- 可以看到對於docker pause這種造成的timeout,active這種方式檢測不出來
DefaultLettucePool.LettuceFactory
spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java
private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
private final RedisClient client;
private int dbIndex;
public LettuceFactory(RedisClient client, int dbIndex) {
this.client = client;
this.dbIndex = dbIndex;
}
public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
if (pooledObject.getObject() instanceof StatefulRedisConnection) {
((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
}
}
public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
try {
((StatefulConnection)obj.getObject()).close();
} catch (Exception var3) {
;
}
}
public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
try {
if (obj.getObject() instanceof StatefulRedisConnection) {
((StatefulRedisConnection)obj.getObject()).sync().ping();
}
return true;
} catch (Exception var3) {
return false;
}
}
public StatefulConnection<byte[], byte[]> create() throws Exception {
return this.client.connect(LettuceConnection.CODEC);
}
public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
return new DefaultPooledObject(obj);
}
}
- 被廢棄的DefaultLettucePool裡頭有個LettuceFactory,其validate是通過ping來判斷的,因而更為準確
JedisFactory.validateObject
jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java
class JedisFactory implements PooledObjectFactory<Jedis> {
private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
private final int connectionTimeout;
private final int soTimeout;
private final String password;
private final int database;
private final String clientName;
private final boolean ssl;
private final SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;
//......
@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPort.get();
String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();
return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}
- JedisFactory實現了PooledObjectFactory介面,其validateObject方法不僅校驗isConnected,而且也校驗了ping方法
- ping方法只要超時就會丟擲異常,從而校驗失敗,因而可以感知到docker pause帶來的timeout,從而將連線從連線池剔除
小結
- spring-date-redis的2.0及以上版本廢棄了原來的LettucePool,改為使用LettucePoolingClientConfiguration
- 這裡有一個問題,就是舊版是採用ping的方式,而新版則是使用active欄位來標識,對於docker pause識別不出來
- 對於lettuce的async預設是不採用連線池的,第一次borrow到連線之後,就一直複用底層的連線,也沒有歸還,如果要使用連線池,需要設定shareNativeConnection為false
- jedis的連線池實現,其validateObject方法不僅校驗isConnected,而且也校驗了ping方法,因而能夠感知到docker pause帶來的timeout,從而將連線從連線池剔除