1. 程式人生 > 其它 >Rocketmq<二>springboot整合rocktmq

Rocketmq<二>springboot整合rocktmq

前提:對於sringboot來說 整合任何框架,無非就是三個步驟:1、新增pom依賴 , 2、修改配置檔案 , 3、啟動類添加註解和配置 。

一、pom依賴、配置檔案

pom依賴:

 <!-- SpringBoot整合RocketMQ  https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

配置檔案:

server:
  port: 1004
spring:
  application:
    name: lcn-user

eureka: client: service
-url: defaultZone: http://localhost:7900/eureka/
rocketmq: name
-server: 192.168.10.17:9876 producer: group: ${spring.application.name} send-message-timeout: 3000 retry-times-when-send-failed: 3 mq: user: topic: tpk02 group: name: lcn
-pay tag: tag02 key: key02

二、訊息傳送、消費訊息

1、傳送同步訊息:

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName UserService.java
 * @Description 訊息傳送
 * @createTime 2021-06-22 13:48:00
 */
@Service
@Slf4j
public class UserService {

    @Value("${mq.user.topic}")
    private String topic;
    @Value("${mq.user.tag}")
    
private String tag; @Value("${mq.user.key}") private String key; @Value("${mq.user.group.name}") private String group; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 傳送同步訊息 * @return */ public String sendMsg01(){ threadPoolTaskExecutor.submit(new Runnable() { @Override public void run() { Message message = MessageBuilder.withPayload("傳送同步訊息").build(); String destination = String.format("%s:%s",topic ,tag); SendResult sendResult = rocketMQTemplate.syncSend(destination, message); if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){ log.info("訊息傳送成功【SendStatus】 :{}" ,sendResult); }else { log.info("訊息傳送失敗【SendStatus】 :{}" ,sendResult); } } }); return "ok"; } /** * 傳送非同步訊息 * @return */ public String sendMsg02(){ threadPoolTaskExecutor.submit(new Runnable() { @Override public void run() { Message message = MessageBuilder.withPayload("傳送非同步訊息").build(); String destination = String.format("%s:%s",topic ,tag); rocketMQTemplate.asyncSend(destination, message, new SendCallback() { /** * 傳送成功回撥函式 * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { log.info("訊息傳送成功 【傳送結果】 :{}" ,sendResult); } /** * 傳送失敗回撥函式 * @param e */ @Override public void onException(Throwable e) { log.info("異常:{} ,訊息:{}" ,e ,message); //todo 可以對異常訊息進行其他操作,重新發送或者存入db } }); } }); return "ok"; } /** * 傳送單項訊息 * @return */ public String sendOneWayMsg(){ log.info("傳送單項訊息------------------------------------"); Message<String> message = MessageBuilder.withPayload("傳送單項訊息").build(); String destination = String.format("%s:%s", topic, tag); rocketMQTemplate.sendOneWay(destination , message); return "ok"; } /** * 傳送順序訊息 * @return */ public String sendOrderlyMsg(){ log.info("傳送順序訊息------------------------------------"); for (int i = 0; i < 100; i++) { int defaultTopicQueueNums = rocketMQTemplate.getProducer().getDefaultTopicQueueNums(); Message<String> message = MessageBuilder.withPayload("傳送順序訊息" + i + " 【佇列數取模】:" + String.valueOf(i%defaultTopicQueueNums)).build(); String destination = String.format("%s:%s", topic, tag); //根據i對topic中queue的數量取模後的值 放入對應的佇列中 只能保證對應 SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, String.valueOf(i%defaultTopicQueueNums)); if(sendResult.getSendStatus().equals(SendStatus.SEND_OK)){ log.info("傳送順序訊息成功 【訊息內容】 :{}" ,sendResult); }else { log.info("傳送順序訊息失敗--------------------"); } } return "ok"; } /** * 傳送事務訊息 * @param msgStr * @return */ public String sendTransactionMessage(String msgStr){ log.info("【傳送訊息】-------------"); Future<TransactionSendResult> submit = threadPoolTaskExecutor.submit(new Callable<TransactionSendResult>() { @Override public TransactionSendResult call() { Message message = MessageBuilder.withPayload(msgStr).build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(group, topic, message, tag); log.info("【傳送狀態】:{}", result.getLocalTransactionState()); return result; } }); LocalTransactionState localTransactionState = null; try { localTransactionState = submit.get().getLocalTransactionState(); } catch (Exception e) { log.error("獲取訊息傳送狀態失敗------------------"); } return localTransactionState.toString(); } /** * * @return */ public String updateUserInfo(){ log.info("操作使用者資訊------------------------------------"); try { // int count = 1/0; }catch (Exception e) { log.info("操作使用者資訊失敗---------------"); return "fail"; } return "ok"; } }

2、事務訊息(本地事務執行、訊息回查):

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName SyncProducerListener.java
 * @Description 事務訊息
 * @createTime 2021-06-22 14:51:00
 */
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "lcn-user01")
public class SyncProducerListener implements RocketMQLocalTransactionListener {
    private ConcurrentHashMap<Integer, RocketMQLocalTransactionState> map=new ConcurrentHashMap<>();


    @Autowired
    private UserService userService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        map.put(message.hashCode(),RocketMQLocalTransactionState.UNKNOWN);

        String result = userService.updateUserInfo();

        if(!result.equalsIgnoreCase("OK")){
            System.out.println("本地事務出錯,回滾事務訊息--------");
            map.put(message.hashCode(),RocketMQLocalTransactionState.ROLLBACK);
        }else {
            map.put(message.hashCode(),RocketMQLocalTransactionState.COMMIT);
        }
        log.info(map.get(message.hashCode()).toString());
        return map.get(message.hashCode());
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("沒有獲得訊息ack  -----  進行訊息回查   訊息的Tag:" + message);
        return map.get(message.hashCode());
    }
}

3、訊息消費:

/**
 * @author D-L
 * @version 1.0.0
 * @ClassName SyncProducerListener.java
 * @Description 訊息接收者
 * @createTime 2021-06-22 22:51:00
 */

@Component
@RocketMQMessageListener(topic = "${mq.user.topic}",consumerGroup = "${mq.user.group.name}",
        selectorExpression = "*" , messageModel = MessageModel.CLUSTERING ,consumeMode = ConsumeMode.ORDERLY)
public class PaymentListener implements RocketMQListener<MessageExt> {
    private static final Logger log = LoggerFactory.getLogger(PaymentListener.class);

    @Override
    public void onMessage(MessageExt messageExt) {

        log.info("開始接收到訊息---------------------------------------");
        //1.解析訊息內容
        try {
            String body = new String(messageExt.getBody(),"UTF-8");
//            User user = JSON.parseObject(body, User.class);
//            log.info("訊息內容-------------:" + user);
            log.info("訊息內容-------------:" + body);
        } catch (UnsupportedEncodingException e) {
            log.error("接收到訊息失敗 ,{}" ,e);
        }
        log.info("訊息消費完成-----------------------------------------");
    }
}