叢集中機器本地快取同步實現機制:redis的釋出訂閱機制
阿新 • • 發佈:2020-11-20
背景:
叢集中,某一臺機器的本地快取更改了,需要同步到叢集中的其他機器上
Redis訂閱配置: 註解@EventListener(ContextRefreshedEvent.class),專案啟動自動初始化redis訂閱
@Slf4j @Component public class RedisSubInitializer { @Autowired private RedisConnection redisConnection; @Autowired private RedisPubsubListener redisPubsubListener; @Async @EventListener(ContextRefreshedEvent.class) public void onApplicationEvent(ContextRefreshedEvent event) { //訂閱 try { LOGGER.info("Redis 訂閱頻道,pubSub={}, channel={}", redisPubsubListener, MSG_CHANNEL); redisConnection.subscribe(redisPubsubListener,MSG_CHANNEL); } catch (Exception e) { LOGGER.error("訂閱失敗channel={}", MSG_CHANNEL, e); } } }
釋出事件
在業務程式碼處,觸發事件釋出
msgPublisher.publish(MSG_CHANNEL, messageVo);
事件釋出類
@Slf4j @Service public class MsgPublisher { @Autowired private RedisConnection redisConnection; /** * 訊息釋出 * @param channel * @param message */ public void publish(String channel, Object message) { try { if (message instanceof String) { redisConnection.publish(channel, String.valueOf(message)); } else { redisConnection.publish(channel, JSON.toJSONString(message)); } LOGGER.info("message publish, channel={}, message={}", channel, message); } catch (Exception e) { LOGGER.error("message pub error, channel={}, message={}", channel, message, e); } } }
監聽事件類,實現org.springframework.data.redis.connection.MessageListener介面
@Slf4j @Component public class RedisPubsubListener implements MessageListener { @Autowired private RedisConnection redisConnection; @Override public void onMessage(Message message, byte[] pattern) { String channel = redisConnection.deserializekey(message.getChannel()); String body = redisConnection.deserializekey(message.getBody()); LOGGER.info("channel={}, message={}", channel, message); try { if(MSG_CHANNEL.equals(channel)) { //處理相關事件,快取同步,從資料庫中重新載入 toHandle(body); } } catch (Exception e) { LOGGER.error("channel={}, message={}", channel, message, e); } }
RedisConnection類:繼承org.redisson.spring.data.connection.RedissonConnection類
@Slf4j
public class RedisConnection extends RedissonConnection {
private RedisSerializer keySerializer;
private RedisSerializer valueSerializer;
private RedissonClient client;
public String deserializekey(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, Charset.forName("UTF-8"));
}
public Long publish(String channel, String message) {
try {
return publish(getKey(channel), getKey(message));
} catch (Exception e) {
LOGGER.error("Redis publish ops error, channel={}, message={}", channel, message, e);
}
return 0L;
}
public void subscribe(MessageListener listener, String ... channels) {
try {
subscribe(listener, getKeys(channels));
} catch (Exception e) {
LOGGER.error("Redis subscribe ops error, listener={}, channels={}", listener, channels, e);
}
}
private byte[] getKey(String key) {
return keySerializer.serialize(getRedisKey(key));
}
private byte[][] getKeys(String ... keys) {
byte[][] bs = new byte[keys.length][];
for (int i = 0; i < keys.length; i++) {
bs[i] = getKey(keys[i]);
}
return bs;
}
}
總結
本地快取同步就可以通過Redis的事件釋出訂閱機制來實現,但本機也會消費到自己釋出的訊息,要做冪等性操作。