設計模式【訊息事件篇】
阿新 • • 發佈:2022-04-07
1.基於cmq訊息事件處理模式,進行訊息處理
①抽象訊息介面
public interface BenefitMsgHandler { /** * 訊息處理器名稱 * * @return */ String getName(); /** * 訊息處理器執行順序 * * @return */ Integer getOrder(); /** * 處理訊息 * * @param benefitTaskMsg 訊息資料 */ void handleMsg(BenefitTaskMsg benefitTaskMsg);/** * 校驗任務編碼 * * @param benefitTaskMsg 訊息資料 * @return */ boolean checkMsgType(BenefitTaskMsg benefitTaskMsg); }
②抽象訊息模型
public abstract class AbstractBenefitMsgHandler implements BenefitMsgHandler { @Autowired protected BenefitMsgHandlerManager benefitMsgHandlerManager;protected String name; protected Integer order = 100; @PostConstruct protected void init() { this.benefitMsgHandlerManager.registerHandler(this); this.name = this.getClass().getName(); } @Override public String getName() { return this.name; } @Overridepublic Integer getOrder() { return this.order; } /** * 處理訊息 * * @param benefitTaskMsg 訊息 */ @Override public abstract void handleMsg(BenefitTaskMsg benefitTaskMsg); /** * 校驗訊息型別 * * @param benefitTaskMsg 訊息實體 * @return */ @Override public abstract boolean checkMsgType(BenefitTaskMsg benefitTaskMsg); }
③ 通用訊息處理器
@Slf4j @Component public class BenefitMsgHandlerManager { /** * 第三方對接任務處理器 */ private CopyOnWriteArrayList<BenefitMsgHandler> benefitMsgHandlers = new CopyOnWriteArrayList<>(); /** * 註冊第三方對接任務處理器 * * @param benefitMsgHandler */ public void registerHandler(BenefitMsgHandler benefitMsgHandler) { //1.註冊 this.benefitMsgHandlers.add(benefitMsgHandler); //2.排序 if (!CollectionUtils.isEmpty(benefitMsgHandlers)) { benefitMsgHandlers.sort(Comparator.comparing(BenefitMsgHandler::getOrder)); } } /** * 第三方對接任務處理 * * @param benefitTaskMsg */ public void handleMsg(BenefitTaskMsg benefitTaskMsg) { //訊息處理器為空直接返回 if (CollectionUtils.isEmpty(benefitMsgHandlers)) { log.info("handleMsg failed , benefit msg handler is empty! "); return; } //處理訊息 for (BenefitMsgHandler benefitMsgHandler : benefitMsgHandlers) { try { if (benefitMsgHandler.checkMsgType(benefitTaskMsg)) { log.info("handle benefit msg, param={}, handler={}", JSONObject.toJSONString(benefitTaskMsg), benefitMsgHandler.getName()); benefitMsgHandler.handleMsg(benefitTaskMsg); return; } } catch (Exception e) { log.error("handle benefit msg, param={}, task={}", JSONObject.toJSONString(benefitTaskMsg), benefitMsgHandler.getName(), e); } } } }
④ 特定訊息處理器--獎勵發放
@Slf4j @Component public class BenefitMsgHandler4CompleteReward extends AbstractBenefitMsgHandler { @Autowired private RewardHandlerManager rewardHandlerManager; @Override public void handleMsg(BenefitTaskMsg benefitTaskMsg) { log.info("BenefitMsgHandler4CompleteReward begin: benefitTaskMsg={}", benefitTaskMsg); BenefitTaskRecord benefitTaskRecord = JSON.parseObject(benefitTaskMsg.getData(), BenefitTaskRecord.class); rewardHandlerManager.completeReward(benefitTaskRecord, false); } @Override public boolean checkMsgType(BenefitTaskMsg benefitTaskMsg) { //獎勵發放 if (StringUtils.equals(benefitTaskMsg.getMsgType(), BenefitMsgTypeEnum.COMPLETE_REWARD.getMsgType())) { return true; } return false; } }
⑤監聽訊息處理
@Slf4j @Component public class CmqListenerService { @Autowired private BenefitMsgHandlerManager benefitMsgHandlerManager; /** * 任務自產訊息處理 * * @param msg */ @CmqListener(queue = "${cmq.topic.queue.benefit.task}") public void benefitMsgHandlerManager(String msg) { log.info("benefitTaskEventHandle:msg={}", msg); try { BenefitTaskMsg benefitTaskMsg = JSON.parseObject(msg, BenefitTaskMsg.class); if (benefitTaskMsg == null) { log.error("benefitTaskEventHandle:msg is invalid"); } benefitMsgHandlerManager.handleMsg(benefitTaskMsg); } catch (Exception e) { log.error("benefitTaskEventHandle:exception:msg={}", msg, e); } } }