1. 程式人生 > 實用技巧 >叢集中機器本地快取同步實現機制:redis的釋出訂閱機制

叢集中機器本地快取同步實現機制:redis的釋出訂閱機制

背景:

叢集中,某一臺機器的本地快取更改了,需要同步到叢集中的其他機器上

Redis訂閱配置: 註解@EventListener(ContextRefreshedEvent.class),專案啟動自動初始化redis訂閱

@Slf4j
@Component
public class RedisSubInitializer {
	@Autowired
	private RedisConnection redisConnection;
	@Autowired
	private RedisPubsubListener redisPubsubListener;

	@Async
	@EventListener(ContextRefreshedEvent.class)
	public void onApplicationEvent(ContextRefreshedEvent event) {
		//訂閱
		try {
			LOGGER.info("Redis 訂閱頻道,pubSub={}, channel={}", redisPubsubListener, MSG_CHANNEL);
			redisConnection.subscribe(redisPubsubListener,MSG_CHANNEL);
		} catch (Exception e) {
			LOGGER.error("訂閱失敗channel={}", MSG_CHANNEL, e);
		}
	}
}

釋出事件

在業務程式碼處,觸發事件釋出

msgPublisher.publish(MSG_CHANNEL, messageVo);

事件釋出類

@Slf4j
@Service
public class MsgPublisher {
	@Autowired
	private RedisConnection redisConnection;

	/**
	 * 訊息釋出
	 * @param channel
	 * @param message
	 */
	public void publish(String channel, Object message) {
		try {
			if (message instanceof String) {
				redisConnection.publish(channel, String.valueOf(message));
			} else {
				redisConnection.publish(channel, JSON.toJSONString(message));
			}
			LOGGER.info("message publish, channel={}, message={}", channel, message);
		} catch (Exception e) {
			LOGGER.error("message pub error, channel={}, message={}", channel, message, e);
		}
	}
}

監聽事件類,實現org.springframework.data.redis.connection.MessageListener介面

@Slf4j
@Component
public class RedisPubsubListener implements MessageListener {
	@Autowired
	private RedisConnection redisConnection;

	@Override
	public void onMessage(Message message, byte[] pattern) {
		String channel = redisConnection.deserializekey(message.getChannel());
		String body = redisConnection.deserializekey(message.getBody());
		LOGGER.info("channel={}, message={}", channel, message);
		try {
			if(MSG_CHANNEL.equals(channel)) {
				//處理相關事件,快取同步,從資料庫中重新載入
                toHandle(body);
			}
		} catch (Exception e) {
			LOGGER.error("channel={}, message={}", channel, message, e);
		}
	}

RedisConnection類:繼承org.redisson.spring.data.connection.RedissonConnection類

@Slf4j
public class RedisConnection extends RedissonConnection {
	private RedisSerializer keySerializer;
	private RedisSerializer valueSerializer;
	private RedissonClient client;
	public String deserializekey(byte[] bytes) {
		if (bytes == null) {
			return null;
		}
		return new String(bytes, Charset.forName("UTF-8"));
	}
	public Long publish(String channel, String message) {
		try {
			return publish(getKey(channel), getKey(message));
		} catch (Exception e) {
			LOGGER.error("Redis publish ops error, channel={}, message={}", channel, message, e);
		}
		return 0L;
	}

	public void subscribe(MessageListener listener, String ... channels) {
		try {
			subscribe(listener, getKeys(channels));
		} catch (Exception e) {
			LOGGER.error("Redis subscribe ops error, listener={}, channels={}", listener, channels, e);
		}
	}
    private byte[] getKey(String key) {
		return keySerializer.serialize(getRedisKey(key));
	}

	private byte[][] getKeys(String ... keys) {
		byte[][] bs = new byte[keys.length][];
		for (int i = 0; i < keys.length; i++) {
			bs[i] = getKey(keys[i]);
		}
		return bs;
	}
}

總結

本地快取同步就可以通過Redis的事件釋出訂閱機制來實現,但本機也會消費到自己釋出的訊息,要做冪等性操作。