1. 程式人生 > 其它 >RocketMQ使用——實現1個生產者2個消費者

RocketMQ使用——實現1個生產者2個消費者

STEP1:使用docker啟動rocketmq

# 需填寫本機IP
ip=192.168.0.99

# broker的配置檔案
echo "brokerIP1=$ip
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH" > broker.conf

docker rm -f rmqserver rmqbroker rmqconsole

docker run -d --network=host --name rmqserver foxiswho/rocketmq:server-4.5.1

docker run -d --network=host\
 --name rmqbroker --add-host namesrv:$ip\
 -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
 -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
 -v `pwd`/broker.conf:/etc/rocketmq/broker.conf\
 foxiswho/rocketmq:broker-4.5.1

docker run -d --name rmqconsole -p 8180:8080\
 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=$ip:9876\
 -Dcom.rocketmq.sendMessageWithVIPChannel=false"\
 -t styletang/rocketmq-console-ng
 

執行後,可以通過localhost:8180訪問rocketmq的控制檯

STEP2:生產者

https://start.spring.io/下載一個Spring Boot專案

新增RocketMQ的依賴

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-spring-boot-starter</artifactId>
   <version>2.2.0</version>
</dependency>

新增配置到application.properties

rocketmq.nameServer=192.168.0.99:9876
rocketmq.producer.group=newProducer
rocketmq.producer.topic=newTopic

編寫controller傳送訊息

package com.example.demo.controller;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Value("${rocketmq.producer.topic}")
    String topic;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/")
    public String hello(String msg) {
        rocketMQTemplate.convertAndSend(topic, msg);
        return "訊息已傳送:" + msg;
    }
}

STEP3:消費者

編寫兩個消費者接受訊息,注意,他們的consumerGroup不同

package com.example.demo.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class SpringConsumer implements RocketMQListener<String>
{
    @Override
    public void onMessage(String msg)
    {
        System.out.println("SpringConsumer收到訊息:" + msg);
    }
}
package com.example.demo.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "another-group")
public class SpringConsumer2 implements RocketMQListener<String>
{
    @Override
    public void onMessage(String msg)
    {
        System.out.println("SpringConsumer2收到訊息:" + msg);
    }
}

STEP4:測試

訪問http://localhost:8080/?msg=hello%20world可以看到控制檯輸出收到兩條訊息