SpringBoot整合Spring-data-redis實現集中式快取
從框架的角度來看,儲存在Redis中的資料只是位元組。雖然說Redis支援多種資料型別,但是那只是意味著儲存資料的方式,而不是它所代表的內容。由我們將這些資料轉化成字串或者是其他物件。我們通過org.springframework.data.redis.serializer. RedisSerializer將自定義的物件資料和儲存在Redis上的原始資料之間相互轉換,顧名思義,它處理的就是序列化的過程。
先看一下RedisSerializer介面
public interface RedisSerializer<T> { /** * 把一個物件序列化二進位制資料 */ byte[] serialize(T t) throws SerializationException; /** * 通過給定的二進位制資料反序列化成物件 */ T deserialize(byte[] bytes) throws SerializationException; }
注意這裡作者提示我們:Redis does not accept null keys or values but can return null replies (fornon existing keys). 大致意思Redis不接受key為null,但是對於那些不存在的key,會返回null。但是這裡可以採用官方提供的org.springframework.cache.support.NullValue作為null的佔位符.
NullValue原始碼如下:
public final class NullValue implements Serializable { static final Object INSTANCE = new NullValue(); private static final long serialVersionUID = 1L; private NullValue() { } private Object readResolve() { return INSTANCE; } }
下面是RedisSerializer介面的幾種實現方式:
首先在RedisTemplate中,我們可以看到afterPropertiesSet()方法
public void afterPropertiesSet() { super.afterPropertiesSet(); boolean defaultUsed = false; if (defaultSerializer == null) { defaultSerializer = new JdkSerializationRedisSerializer( classLoader != null ? classLoader : this.getClass().getClassLoader()); } if (enableDefaultSerializer) { if (keySerializer == null) { keySerializer = defaultSerializer; defaultUsed = true; } if (valueSerializer == null) { valueSerializer = defaultSerializer; defaultUsed = true; } if (hashKeySerializer == null) { hashKeySerializer = defaultSerializer; defaultUsed = true; } if (hashValueSerializer == null) { hashValueSerializer = defaultSerializer; defaultUsed = true; } } if (enableDefaultSerializer && defaultUsed) { Assert.notNull(defaultSerializer, "default serializer null and not all serializers initialized"); } if (scriptExecutor == null) { this.scriptExecutor = new DefaultScriptExecutor<K>(this); } initialized = true; }
這個方法時org.springframework.beans.factory .InitializingBean介面宣告的一個方法,這個介面主要就是做一些初始化動作,或者檢查已經設定好bean的屬性。或者在XML裡面加入一個init-method,這裡我們可以知道預設的RedisSerializer就是JdkSerializationRedisSerializer,而StringRedisTemplate的預設序列化全是public class StringRedisTemplate extendsRedisTemplate<String, String>
public StringRedisTemplate() {
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
setKeySerializer(stringSerializer);
setValueSerializer(stringSerializer);
setHashKeySerializer(stringSerializer);
setHashValueSerializer(stringSerializer);
}
public class StringRedisSerializer implements RedisSerializer<String> {
private final Charset charset;
public StringRedisSerializer() {
this(Charset.forName("UTF8"));
}
public StringRedisSerializer(Charset charset) {
Assert.notNull(charset);
this.charset = charset;
}
public String deserialize(byte[] bytes) {
return (bytes == null ? null : new String(bytes, charset));
}
public byte[] serialize(String string) {
return (string == null ? null : string.getBytes(charset));
}
}
RedisCacheManager
RedisCacheManager的父類是AbstractTransactionSupportingCacheManager,有名字可以知道是對事務支援的一個CacheManager,預設是不會感知事務
privateboolean transactionAware = false;
對此官網的解釋是:
Set whether this CacheManager should exposetransaction-aware Cache objects.
Default is "false". Set this to"true" to synchronize cache put/evict
operations with ongoing Spring-managedtransactions, performing the actual cache
put/evict operation only in theafter-commit phase of a successful transaction.
大致意思是可感知事務的意思,put,evict,意味著會改變cache,所以put,evict操作必須一個事務(同步操作),其他執行緒必須等正在進行put,evict操作的執行緒執行完,才能緊接著操作。
再往上抽取的類就是AbstractCacheManager
取幾個比較經典的方法:
// Lazy cache initialization on access
@Override
public Cache getCache(String name) {
Cache cache = this.cacheMap.get(name);
if (cache != null) {
return cache;
}
else {
// Fully synchronize now for missing cache creation...
synchronized (this.cacheMap) {
cache = this.cacheMap.get(name);
if (cache == null) {
cache = getMissingCache(name);
if (cache != null) {
cache = decorateCache(cache);
this.cacheMap.put(name, cache);
updateCacheNames(name);
}
}
return cache;
}
}
}
這個cacheMap就是非常經典的併發容器ConcurrentHashMap,它Spring自帶管理cache的工具,每個Java開發人員都應該去讀一下它的實現思想。。。
private finalConcurrentMap<String, Cache> cacheMap = newConcurrentHashMap<String, Cache>(16);
不用說,我們直接可以想到肯定set管理cache的name。
private volatileSet<String> cacheNames = Collections.emptySet();
關於volatile,不用多說,原子性,可見性每個Java開發人員都應該理解的。。
這個synchronized,不用多說。。也是必須理解的- -,getMissingCache()這個方法預設返回null,決定權交給其實現者,可以根據name建立,也可以記錄日誌什麼的。
protected Cache getMissingCache(String name) {
return null;
}
decorateCache()這個方法顧名思義就是裝飾這個cache,預設直接返回,就是我們經典的裝飾模式,IO類庫的設計裡面也有這個裝飾模式。所以說常用的設計模式也必須要掌握啊。
protected Cache decorateCache(Cache cache) {
return cache;
}
這裡就在子類AbstractTransactionSupportingCacheManager,裡面去根據isTransactionAware欄位去判斷是否進行事務可感知來修飾這個cache。
@Override
protected Cache decorateCache(Cache cache) {
return (isTransactionAware() ? new TransactionAwareCacheDecorator(cache) : cache);
}
updateCacheNames()這個方法
private void updateCacheNames(String name) {
Set<String> cacheNames = new LinkedHashSet<String>(this.cacheNames.size() + 1);
cacheNames.addAll(this.cacheNames);
cacheNames.add(name);
this.cacheNames = Collections.unmodifiableSet(cacheNames);
}
一個有順序的set集合,最後用Collections包裝成一個不能修改的set檢視,LinkedHashSet也是非常有必要去了解一下底層原理的。。
配置RedisCacheManager非常簡單,首先RedisCacheManager依賴RedisTemplate,RedisTemplate又依賴於連線工廠,這裡就是我們的RedisConnectionFactory的實現類
JedisConnectionFactory,關於這個連線工廠:
Note: Though the database index isconfigurable, the JedisConnectionFactory only supports connecting to one Redisdatabase at a time.
Because Redis is single threaded, you areencouraged to set up multiple instances of Redis instead of using multipledatabases within a single process. This allows you to get better CPU/resourceutilization.
大意就是:雖然資料庫索引是可配置的,但JedisConnectionFactory只支援一次連線到一個Redis資料庫。由於Redis是單執行緒的,因此建議您設定多個Redis例項,而不是在一個程序中使用多個數據庫。這可以讓你獲得更好的CPU /資源利用率。
預設是下面配置:
· hostName=”localhost”
· port=6379
· timeout=2000 ms
· database=0
· usePool=true
先看下屬性
//Redis具體操作的類
@SuppressWarnings("rawtypes")//
private final RedisOperations redisOperations;
//是否使用字首修飾cache
private boolean usePrefix = false;
// usePrefix = true的時候使用預設的字首DefaultRedisCachePrefix,是:
private RedisCachePrefix cachePrefix = new DefaultRedisCachePrefix();
//遠端載入快取
private boolean loadRemoteCachesOnStartup = false;
//當super的快取不存在時,是否建立快取,false的話就不會去建立快取
private boolean dynamic = true;
// 0 - never expire 永不過期
private long defaultExpiration = 0;
// 針對專門的key設定快取過期時間
private Map<String, Long> expires = null;
//快取的名字集合
private Set<String> configuredCacheNames;
這裡官方強烈建議我們開啟使用字首。redisCacheManager.setUsePrefix(true),因為這裡預設為false。
/**
* @param cacheName must not be {@literal null} or empty.
* @param keyPrefix can be {@literal null}.
*/
public RedisCacheMetadata(String cacheName, byte[] keyPrefix) {
hasText(cacheName, "CacheName must not be null or empty!");
this.cacheName = cacheName;
this.keyPrefix = keyPrefix;
StringRedisSerializer stringSerializer = new StringRedisSerializer();
// name of the set holding the keys
this.setOfKnownKeys = usesKeyPrefix() ? new byte[] {} : stringSerializer.serialize(cacheName + "~keys");
this.cacheLockName = stringSerializer.serialize(cacheName + "~lock");
}
這裡我們通過追蹤原始碼可以看見構造RedisCaheMetdata的setOfKnownKeys時候會生成一個字尾為~keys的key,而這個key的在Redis中型別是zset,它是維護已知key的一個有序set,底層是LinkedHashSet。同時我們也會在官網中看到:
By default RedisCacheManager does notprefix keys for cache regions, which can lead to an unexpected growth of a ZSETused to maintain known keys. It’s highly recommended to enable the usage ofprefixes in order to avoid this unexpected growth and potential key clashesusing more than one cache region.
大致意思就是預設情況下,RedisCacheManager不會為快取區域建立字首,這樣會導致維護管理已知的那些key的那個zset會急劇增長(ps:這個zset的name就是上面說的setOfKnownKeys)。因此強烈建議開啟預設字首,以免這個zset意外增長以及使用多個緩衝區域帶來的潛在衝突。
關於這個cacheLockName,是cache名稱字尾為~lock的key,作為一把鎖存放在Redis伺服器上。而RedisCache其中clear方法用於清除當前cache塊中所有的元素,這裡會加鎖,而鎖的實現就是在伺服器上面放剛才key是cacheLockName的元素,最後清除鎖則是在clear方法執行完成後在finally中清除。 put與get方法執行時會檢視是否存在lock鎖,存在則會sleep 300毫秒。這個過程會一直繼續,直到redis伺服器上不存在鎖時才會進行相應的get與put操作,這裡存在一個問題,如果clear方法執行時間很長,這時當前執行clear操作的機子掛了,就導致lock元素一直存在於redis伺服器上。
之後就算這個機子重新啟動後,也無法正常使用cache。原因是:get與put方法在執行時,鎖lock始終存在於redis伺服器上,所以在使用時應當小心避免這種問題。下面可以追蹤下原始碼看下:
在RedisCache類中的靜態抽象內部類LockingRedisCacheCallback<T>中,我們可以看見在Cache的元資料中設定鎖。
@Override
public T doInRedis(RedisConnection connection) throws DataAccessException {
if (connection.exists(metadata.getCacheLockKey())) {
return null;
}
try {
connection.set(metadata.getCacheLockKey(), metadata.getCacheLockKey());
return doInLock(connection);
} finally {
connection.del(metadata.getCacheLockKey());
}
}
在RedisCache類中的靜態抽象內部類中,static abstract classAbstractRedisCacheCallback<T>
private long WAIT_FOR_LOCK_TIMEOUT = 300;
protected boolean waitForLock(RedisConnection connection) {
boolean retry;
boolean foundLock = false;
do {
retry = false;
if(connection.exists(cacheMetadata.getCacheLockKey())) {
foundLock = true;
try {
Thread.sleep(WAIT_FOR_LOCK_TIMEOUT);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
retry = true;
}
} while (retry);
return foundLock;
}
connection.exists(cacheMetadata.getCacheLockKey()就是判斷哪個鎖是否還在Redis中。下面我們可以簡單測試下:
@RequestMapping(value = "save/{key}.do", method = RequestMethod.POST)
@Cacheable(value = "cache",key = "#key",condition = "#key != ''")
public String save( @PathVariable String key) {
System.out.println("走資料庫");
System.out.println(cacheManager.getCacheNames());
return "succful";
}
當我們這樣設定的時候
redisCacheManager.setUsePrefix(false);
redisCacheManager.setDefaultExpiration(60*30);
這時候就會產生一個cache名+~keys的一個zset維護key的名字的一個集合
redisCacheManager.setUsePrefix(true);
redisCacheManager.setCachePrefix(new MyRedisCachePrefix());
這個MyRedisCachePrefix實現了MyRedisCachePrefix介面,預設是DefaultRedisCachePrefix()是:作為分隔符,這裡只是換成了#。同時這時候我們的這些快取都有了名稱空間cache加上我們自定義的#分隔符,防止了快取的衝突。
上面對於值的序列化都統一採用了Jackson2JsonRedisSerializer
template.setValueSerializer(jackson2JsonRedisSerializer);,對於key的序列化採用了
StringRedisSerializer。
對於序列的化採用是跟String互動的多就用StringRedisSerializer,儲存POLO類的Json資料時就用Jackson2JsonRedisSerializer。
對於data-redis封裝Cache來說,好處是非常明顯的,既可以很方便的快取物件,相比較於SpringCache、Ecache這些程序級別的快取來說,現在快取的記憶體的是使用redis的記憶體,不會消耗JVM的記憶體,提升了效能。當然這裡Redis不是必須的,換成其他的快取伺服器一樣可以,只要實現Spring的Cache類,並配置到XML裡面就行。和原生態的jedis相比,只要方法上加上註解,就可以實現快取,對於應用開發人員來說,使用快取變的簡單。同時也有利於對不同的業務快取進行分組統計、監控。
附錄:SpringBoot整合data-redis
Redis單節點:
@Bean
public RedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory cf = new JedisConnectionFactory();
cf.setHostName("10.188.182.140");
cf.setPort(6379);
cf.setPassword("root");
cf.afterPropertiesSet();
return cf;
}
或者在application.properites、application.yml檔案裡配置
Spring.redis.host: 172.26.223.153
Spring.redis.port: 6379
哨兵模式:
類註解:@RedisSentinelConfiguration或者@ PropertySource
配置檔案
· spring.redis.sentinel.master
:mymaster
· spring.redis.sentinel.nodes
: 127.0.0.1:6379
@Bean
public RedisConnectionFactory jedisConnectionFactory() {
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration() .master("mymaster")
.sentinel("127.0.0.1", 26379) .sentinel("127.0.0.1", 26380);
return new JedisConnectionFactory(sentinelConfig);
}
叢集模式:
· spring.redis.cluster.nodes
:node1,node2…….
· spring.redis.cluster.max-redirects
: 叢集之間最大重定向次數
@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class ClusterConfigurationProperties {
/*
* spring.redis.cluster.nodes[0] = 127.0.0.1:7379
* spring.redis.cluster.nodes[1] = 127.0.0.1:7380
* ...
*/
List<String> nodes;
/**
* Get initial collection of known cluster nodes in format {@code host:port}.
*
* @return
*/
public List<String> getNodes() {
return nodes;
}
public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
}
@Configuration
public class AppConfig {
/**
* Type safe representation of application.properties
*/
@Autowired ClusterConfigurationProperties clusterProperties;
public @Bean RedisConnectionFactory connectionFactory() {
return new JedisConnectionFactory(
new RedisClusterConfiguration(clusterProperties.getNodes()));
}
}
開啟快取,注意預設@SpringBootApplication會掃描當前同級目錄及其子目錄的帶有@Configuration的類(僅僅支援1.2+版本的springboot,之前是有三個註解@Configuration、@ComponentScan、@EnableAtuoConfiguration),當啟動類上使用@ComponentScan註解的時候就只會掃描你自定義的基礎包。當有xml.檔案的時候,建議在@Configuration類上面
@ImportResource({"classpath:xxx.xml","classpath:yyy.xml"})匯入
又或者當你的你@Configuration配置類沒有預設在@SpringBootApplication掃描的路徑下,可以使用@Import({xxx.class,yyy.class})
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public CacheManager cacheManager(@SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
RedisCacheManager redisCacheManager = new RedisCacheManager(redisTemplate);
redisCacheManager.setDefaultExpiration(60*30);
redisCacheManager.setTransactionAware(true);
redisCacheManager.setUsePrefix(true);
redisCacheManager.setCachePrefix(new MyRedisCachePrefix());
return redisCacheManager;
}
@Bean
@SuppressWarnings({ "rawtypes", "unchecked" })
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//使用StringRedisSerializer來序列化和反序列化redis的key值
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}