博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring-data-redis的连接池的校验
阅读量:7228 次
发布时间:2019-06-29

本文共 13079 字,大约阅读时间需要 43 分钟。

  hot3.png

本文主要研究一下spring-data-redis的连接池的校验

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);    private final LettuceConnectionProvider connectionProvider;    private final GenericObjectPoolConfig poolConfig;    private final Map
, GenericObjectPool
>> poolRef = new ConcurrentHashMap(32); private final Map
, GenericObjectPool
>> pools = new ConcurrentHashMap(32); LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) { Assert.notNull(connectionProvider, "ConnectionProvider must not be null!"); Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); this.connectionProvider = connectionProvider; this.poolConfig = clientConfiguration.getPoolConfig(); } public
> T getConnection(Class
connectionType) { GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> { return ConnectionPoolSupport.createGenericObjectPool(() -> { return this.connectionProvider.getConnection(connectionType); }, this.poolConfig, false); }); try { StatefulConnection
connection = (StatefulConnection)pool.borrowObject(); this.poolRef.put(connection, pool); return (StatefulConnection)connectionType.cast(connection); } catch (Exception var4) { throw new PoolException("Could not get a resource from the pool", var4); } } public AbstractRedisClient getRedisClient() { if (this.connectionProvider instanceof RedisClientProvider) { return ((RedisClientProvider)this.connectionProvider).getRedisClient(); } else { throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName())); } } public void release(StatefulConnection
connection) { GenericObjectPool
> pool = (GenericObjectPool)this.poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } else { pool.returnObject(connection); } } public void destroy() throws Exception { if (!this.poolRef.isEmpty()) { log.warn("LettucePoolingConnectionProvider contains unreleased connections"); this.poolRef.forEach((connection, pool) -> { pool.returnObject(connection); }); this.poolRef.clear(); } this.pools.forEach((type, pool) -> { pool.close(); }); this.pools.clear(); }}
  • 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

public static 
> GenericObjectPool
createGenericObjectPool( Supplier
connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); AtomicReference
> poolRef = new AtomicReference<>(); GenericObjectPool
pool = new GenericObjectPool
(new RedisPooledObjectFactory
(connectionSupplier), config) { @Override public T borrowObject() throws Exception { return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject(); } @Override public void returnObject(T obj) { if (wrapConnections && obj instanceof HasTargetConnection) { super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection()); return; } super.returnObject(obj); } }; poolRef.set(pool); return pool; }
  • 这里使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

private static class RedisPooledObjectFactory
> extends BasePooledObjectFactory
{ private final Supplier
connectionSupplier; RedisPooledObjectFactory(Supplier
connectionSupplier) { this.connectionSupplier = connectionSupplier; } @Override public T create() throws Exception { return connectionSupplier.get(); } @Override public void destroyObject(PooledObject
p) throws Exception { p.getObject().close(); } @Override public PooledObject
wrap(T obj) { return new DefaultPooledObject<>(obj); } @Override public boolean validateObject(PooledObject
p) { return p.getObject().isOpen(); } }
  • 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java

public abstract class RedisChannelHandler
implements Closeable, ConnectionFacade { private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class); private Duration timeout; private CloseEvents closeEvents = new CloseEvents(); private final RedisChannelWriter channelWriter; private final boolean debugEnabled = logger.isDebugEnabled(); private volatile boolean closed; private volatile boolean active = true; private volatile ClientOptions clientOptions; //...... /** * Notification when the connection becomes active (connected). */ public void activated() { active = true; closed = false; } /** * Notification when the connection becomes inactive (disconnected). */ public void deactivated() { active = false; } /** * * @return true if the connection is active and not closed. */ public boolean isOpen() { return active; } @Override public synchronized void close() { if (debugEnabled) { logger.debug("close()"); } if (closed) { logger.warn("Connection is already closed"); return; } if (!closed) { active = false; closed = true; channelWriter.close(); closeEvents.fireEventClosed(this); closeEvents = new CloseEvents(); } }}
  • isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
  • 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来

LettuceConnectionFactory.SharedConnection.validateConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

/**		 * Validate the connection. Invalid connections will be closed and the connection state will be reset.		 */		void validateConnection() {			synchronized (this.connectionMonitor) {				boolean valid = false;				if (connection != null && connection.isOpen()) {					try {						if (connection instanceof StatefulRedisConnection) {							((StatefulRedisConnection) connection).sync().ping();						}						if (connection instanceof StatefulRedisClusterConnection) {							((StatefulRedisConnection) connection).sync().ping();						}						valid = true;					} catch (Exception e) {						log.debug("Validation failed", e);					}				}				if (!valid) {					if (connection != null) {						connectionProvider.release(connection);					}					log.warn("Validation of shared connection failed. Creating a new connection.");					resetConnection();					this.connection = getNativeConnection();				}			}		}
  • 这个是默认开启LettuceConnectionFactory的shareNativeConnection走的获取连接的方法
  • 如果LettuceConnectionFactory的validateConnection为true的话(默认为false),则会自己在每次get的时候执行一下validateConnection

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

private static class LettuceFactory extends BasePooledObjectFactory
> { private final RedisClient client; private int dbIndex; public LettuceFactory(RedisClient client, int dbIndex) { this.client = client; this.dbIndex = dbIndex; } public void activateObject(PooledObject
> pooledObject) throws Exception { if (pooledObject.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex); } } public void destroyObject(PooledObject
> obj) throws Exception { try { ((StatefulConnection)obj.getObject()).close(); } catch (Exception var3) { ; } } public boolean validateObject(PooledObject
> obj) { try { if (obj.getObject() instanceof StatefulRedisConnection) { ((StatefulRedisConnection)obj.getObject()).sync().ping(); } return true; } catch (Exception var3) { return false; } } public StatefulConnection
create() throws Exception { return this.client.connect(LettuceConnection.CODEC); } public PooledObject
> wrap(StatefulConnection
obj) { return new DefaultPooledObject(obj); } }
  • 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {	//......	private Pool
createPool() { if (isRedisSentinelAware()) { return createRedisSentinelPool(this.sentinelConfig); } return createRedisPool(); } /** * Creates {@link JedisSentinelPool}. * * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}. * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool
createRedisSentinelPool(RedisSentinelConfiguration config) { GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig(); return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()), poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName()); } /** * Creates {@link JedisPool}. * * @return the {@link Pool} to use. Never {@literal null}. * @since 1.4 */ protected Pool
createRedisPool() { return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName(), isUseSsl(), clientConfiguration.getSslSocketFactory().orElse(null), // clientConfiguration.getSslParameters().orElse(null), // clientConfiguration.getHostnameVerifier().orElse(null)); } //......}
  • 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java

class JedisFactory implements PooledObjectFactory
{ private final AtomicReference
hostAndPort = new AtomicReference
(); private final int connectionTimeout; private final int soTimeout; private final String password; private final int database; private final String clientName; private final boolean ssl; private final SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; //...... @Override public boolean validateObject(PooledObject
pooledJedis) { final BinaryJedis jedis = pooledJedis.getObject(); try { HostAndPort hostAndPort = this.hostAndPort.get(); String connectionHost = jedis.getClient().getHost(); int connectionPort = jedis.getClient().getPort(); return hostAndPort.getHost().equals(connectionHost) && hostAndPort.getPort() == connectionPort && jedis.isConnected() && jedis.ping().equals("PONG"); } catch (final Exception e) { return false; } }}
  • JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
  • ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除

小结

  • spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
  • 这里有一个问题,就是旧版连接池校验是采用ping的方式,而新版连接池校验则是使用active字段来标识,对于docker pause识别不出来
  • 对于lettuce其shareNativeConnection参数默认为true,且validateConnection为false,第一次从连接池borrow到连接之后,就一直复用底层的连接,也没有归还。如果要每次获取连接都走连接池获取然后归还,需要设置shareNativeConnection为false
  • jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除
  • 对于lettuce来说,如果要识别docker pause的异常,有两个方案,一个是修复ConnectionPoolSupport中RedisPooledObjectFactory的validateObject方法,不仅判断isOpen,还需要ping一下;另外一个是不开启连接池,并且将LettuceConnectionFactory的validateConnection参数设置为true

doc

转载于:https://my.oschina.net/go4it/blog/2051206

你可能感兴趣的文章
iptables详解
查看>>
Protostuff 介绍
查看>>
一张图看懂开源许可协议,开源许可证GPL、BSD、MIT、Mozilla、Apache和LGPL的区别...
查看>>
参数验证其实可以更简明一点
查看>>
Set up Mule runtime env with mule-standalone-3.6.0
查看>>
Linux基础-linux命令:csplit
查看>>
core_framework —— 基于libev的轻量级lua网络开发框架
查看>>
回到顶部
查看>>
DES/3DES(TripleDES)加密、解密测试数据
查看>>
Maven项目标准目录结构
查看>>
Tomcat 系统架构与设计模式,第 1 部分: 工作原理
查看>>
Hadoop输出参数信息详解(16)
查看>>
ERROR 2002 (HY000): Can't connect to local MySQL错误
查看>>
Java版冒泡排序法
查看>>
关于FB4.6插件安装后默认语言环境的更改问题
查看>>
免费分区助手
查看>>
Javascript通过Name调用Function
查看>>
统计当前在线用户数量
查看>>
IntelliJ IDEA 乱码解决方案 (项目代码、控制台等)
查看>>
PHP项目记录
查看>>