RocketMQ使用——實現1個生產者2個消費者
阿新 • • 發佈:2021-05-15
STEP1:使用docker啟動rocketmq
# 需填寫本機IP ip=192.168.0.99 # broker的配置檔案 echo "brokerIP1=$ip brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH" > broker.conf docker rm -f rmqserver rmqbroker rmqconsole docker run -d --network=host --name rmqserver foxiswho/rocketmq:server-4.5.1 docker run -d --network=host\ --name rmqbroker --add-host namesrv:$ip\ -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\ -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\ -v `pwd`/broker.conf:/etc/rocketmq/broker.conf\ foxiswho/rocketmq:broker-4.5.1 docker run -d --name rmqconsole -p 8180:8080\ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=$ip:9876\ -Dcom.rocketmq.sendMessageWithVIPChannel=false"\ -t styletang/rocketmq-console-ng
執行後,可以通過localhost:8180訪問rocketmq的控制檯
STEP2:生產者
https://start.spring.io/下載一個Spring Boot專案
新增RocketMQ的依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
新增配置到application.properties
rocketmq.nameServer=192.168.0.99:9876
rocketmq.producer.group=newProducer
rocketmq.producer.topic=newTopic
編寫controller傳送訊息
package com.example.demo.controller; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Value("${rocketmq.producer.topic}") String topic; @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/") public String hello(String msg) { rocketMQTemplate.convertAndSend(topic, msg); return "訊息已傳送:" + msg; } }
STEP3:消費者
編寫兩個消費者接受訊息,注意,他們的consumerGroup不同
package com.example.demo.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class SpringConsumer implements RocketMQListener<String>
{
@Override
public void onMessage(String msg)
{
System.out.println("SpringConsumer收到訊息:" + msg);
}
}
package com.example.demo.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "another-group")
public class SpringConsumer2 implements RocketMQListener<String>
{
@Override
public void onMessage(String msg)
{
System.out.println("SpringConsumer2收到訊息:" + msg);
}
}
STEP4:測試
訪問http://localhost:8080/?msg=hello%20world可以看到控制檯輸出收到兩條訊息