redis佇列先進先出需要注意什麼_「每日一問」用程式碼分析,Redis為什麼可以做訊息佇列...
技術標籤:redis佇列先進先出需要注意什麼
MQ
現在的應用基本上都是採用分散式 系統架構進行設計,而很多分散式系統必備的一個基礎元件就是訊息佇列。
如果大家不想再引入一個其他的訊息元件例如:kafka、RabbitMQ、RocketMQ。恰好你的專案中使用了Redis,可以嘗試利用Redis編寫一個輕量級的訊息元件。
為什麼Redis可以作為訊息佇列
在回答這個問題前,首先我們考慮下,作為一個訊息佇列,應該有哪些特徵?我個人認為應該滿足以下三點要求:
訊息的有序性
問題描述:雖然消費者是非同步處理訊息,但是大部分的情況下消費者仍然需要按照生產者傳送訊息的順序來處理訊息,避免後傳送的訊息被先處理了。對於要求訊息保序的場景來說,一旦出現這種訊息被亂序處理的情況,就可能會導致業務邏輯被錯誤執行,從而給業務方造成損失。
解決方案:redis提供的資料結構list和sorted set都是有序儲存的結構,都可以保證存入資料的有序性。但是個人更傾向於使用list,一是因為list的有序性不需要人為的干預,sorted set需要設定score來維護順序;二是因為list支援阻塞是獲取brpop,避免了輪詢佇列造成的CPU消耗。
重複消費的處理
問題描述:消費者從訊息佇列讀取訊息時,有時會因為網路堵塞而出現訊息重傳的情況。此時,消費者可能會收到多條重複的訊息。對於重複的訊息,消費者如果多次處理的話,就可能造成一個業務邏輯被多次執行,如果業務邏輯正好是要修改資料,那就會出現資料被多次修改的問題了。
解決方案:針對這一問題,生產者在生產訊息時,生成一個全域性唯一的訊息id,消費者在消費時,手動記錄已經消費的訊息id。避免訊息的重複消費。
訊息的可靠性
問題描述:消費者在處理訊息的時候,還可能出現因為故障或宕機導致訊息沒有處理完成的情況。此時,訊息佇列需要能提供訊息可靠性的保證,也就是說,當消費者重啟後,可以重新讀取訊息再次進行處理,否則,就會出現訊息漏處理的問題了。
解決方案:redis支援資料的持久化,因此可以保證資料的可靠性,具體可以參考我之前寫的文章「每日一問」Redis宕機後,如何恢復資料?
簡易程式碼實現
1.建立三個註解分別表示訊息監聽者、訊息消費者、訊息處理者,具體程式碼如下:
訊息消費者容器/*** 訊息消費者容器,被該註解標註的類表示其內部的被@see MessageListener 標註的public方法* 為一個訊息監聽器*/@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@ [email protected] @interface MessageConsumer { @AliasFor(annotation = Component.class) String value() default "";}
訊息處理器
/*** 訊息處理器標誌,被該註解標註的類,會處理間聽到的訊息,根據訊息的型別選擇處理器*/@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@[email protected] @interface MessageHandler { MessageListener.Mode value() default MessageListener.Mode.TOPIC;}
訊息監聽器
/*** 訊息監聽器,只可用在*/@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface MessageListener { String value() default ""; String topic() default ""; // 監聽的訊息主題 String channel() default ""; //監聽的訊息通道,適用於釋出訂閱模式 Mode mode() default Mode.TOPIC; // 監聽器模式廣播模式、主題模式 enum Mode { TOPIC(), PUBSUB() }}
2.建立訊息處理器
抽象訊息處理器@Slf4jpublic abstract class AbstractMessageHandler implements ApplicationContextAware { protected ApplicationContext applicationContext; protected RedisTemplate redisTemplate; protected void invokeMethod(Method method, Message message, Object bean) { try { method.invoke(bean, message); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } protected Message getMessage(byte[] bytes) { String s = new String(bytes, CharsetUtil.CHARSET_UTF_8); return JSONUtil.toBean(s, Message.class); } public AbstractMessageHandler(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } protected RedisConnection getConnection() { RedisConnection connection = redisTemplate.getRequiredConnectionFactory().getConnection(); return connection; } public abstract void invokeMessage(Method method); @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } protected void consumer(Method method, Set consumers, Object bean, byte[] message) { Message msg = getMessage(message); if (consumers.add(msg.getId())) { invokeMethod(method, msg, bean); } else { log.error("has consumed message {}", msg); } }}
topic模式訊息處理器
@MessageHandler(value = MessageListener.Mode.TOPIC)public class TopicMessageHandler extends AbstractMessageHandler { public TopicMessageHandler(RedisTemplate redisTemplate) { super(redisTemplate); } @Override public void invokeMessage(Method method) { Set consumers = new HashSet<>(); MessageListener annotation = method.getAnnotation(MessageListener.class); String topic = getTopic(annotation); RedisConnection connection = redisTemplate.getRequiredConnectionFactory().getConnection(); Class declaringClass = method.getDeclaringClass(); Object bean = applicationContext.getBean(declaringClass); while (true) { List bytes = connection.bRPop(1, topic.getBytes()); if (CollectionUtil.isNotEmpty(bytes)) { if (null != bytes.get(1)) { consumer(method, consumers, bean, bytes.get(1)); } } } } private String getTopic(MessageListener annotation) { String value = annotation.value(); String topic = annotation.topic(); return StrUtil.isBlank(topic) ? value : topic; }}
pubsub模式訊息處理器
@MessageHandler(value = MessageListener.Mode.PUBSUB)public class PubsubMessageHandler extends AbstractMessageHandler { public PubsubMessageHandler(RedisTemplate redisTemplate) { super(redisTemplate); } @Override public void invokeMessage(Method method) { Set consumers = new HashSet<>(); MessageListener listener = method.getAnnotation(MessageListener.class); String channel = getChannel(listener); RedisConnection connection = getConnection(); connection.subscribe((message, pattern) -> { Class> declaringClass = method.getDeclaringClass(); Object bean = applicationContext.getBean(declaringClass); byte[] body = message.getBody(); consumer(method, consumers, bean, body); }, channel.getBytes()); } private String getChannel(MessageListener annotation) { String value = annotation.value(); String channel = annotation.channel(); return StrUtil.isBlank(channel) ? value : channel; }}
3.建立消費訊息啟動器
@[email protected] class MessageConsumerStater implements ApplicationRunner, ApplicationContextAware { private ApplicationContext applicationContext; @Override public void run(ApplicationArguments args) { Map invokers = getInvokers(); applicationContext.getBeansWithAnnotation(MessageConsumer.class).values().parallelStream().forEach(consumer -> { Method[] methods = consumer.getClass().getMethods(); if (ArrayUtil.isNotEmpty(methods)) { Arrays.stream(methods).parallel().forEach(method -> invokeMessage(method, invokers)); } }); } // 啟動訊息監聽器 private void startMessageListener(Method method, Map invokers) { MessageListener listener = method.getAnnotation(MessageListener.class); if (null == listener) { return; } MessageListener.Mode mode = listener.mode(); AbstractMessageHandler invoker = invokers.get(mode); if (invoker == null) { log.error("invoker is null"); return; } invoker.invokeMessage(method); } private Map getInvokers() { Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(MessageHandler.class); Map collect = beansWithAnnotation.values().stream().collect(Collectors .toMap(k -> k.getClass().getAnnotation(MessageHandler.class).value(), k -> (AbstractMessageHandler)k)); return collect; } public MessageConsumerStater() { } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}
4.消費訊息器
@MessageConsumerpublic class MqConsumer { @MessageListener(topic = "topic1", mode = MessageListener.Mode.TOPIC) public void testTopic1(Message message) { System.out.println ("topic===> " + message); } @MessageListener(topic = "topic2", mode = MessageListener.Mode.TOPIC) public void testTopic2(Message message) { System.out.println ("topic===> " + message); } @MessageListener(topic = "topic3", mode = MessageListener.Mode.TOPIC) public void testTopic3(Message message) { System.out.println ("topic===> " + message); } @MessageListener(channel = "pubsub", mode = MessageListener.Mode.PUBSUB) public void testPubsub1(Message message) { System.out.println ("pubsub===> " + message); } @MessageListener(channel = "pubsub", mode = MessageListener.Mode.PUBSUB) public void testPubsub2(Message message) { System.out.println ("pubsub===> " + message); }}
5.輔助實體類
@[email protected]@[email protected] class Message implements Serializable { private String id; private T content;}
為了方便大家學習交流,原始碼我這邊上傳到了碼雲倉庫,歡迎大家學習交流。https://gitee.com/smn322/redismq/