1. 程式人生 > >SpringBoot整合RabbitMQ之發送接收消息實戰

SpringBoot整合RabbitMQ之發送接收消息實戰

container 會同 prope spring 註解 流行 pin public lin

實戰前言
前幾篇文章中,我們介紹了SpringBoot整合RabbitMQ的配置以及實戰了Spring的事件驅動模型,這兩篇文章對於我們後續實戰RabbitMQ其他知識要點將起到奠基的作用的。特別是Spring的事件驅動模型,當我們全篇實戰完畢RabbitMQ並大概了解一下RabbitMQ相關組件的源碼時,會發現其中的ApplicationEvent、ApplicationListener、ApplicationEventPublisher跟RabbitMQ的Message、Listener、RabbitTemplate有“異曲同工之妙”,當然啦,其中更多有關聯關系的是它們的底層源碼,感興趣的童鞋可以研究一番!

實戰概要
從本篇文章將開始采用SpringBoot整合RabbitMQ的方式來實戰相關知識要點、企業級應用業務模塊以及微服務項目一些典型的問題。
本篇文章將介紹實戰RabbitMQ在SpringBoot項目中的基本應用,即如何創建隊列、交換機、路由及其綁定以及如何發送接收消息!

實戰歷程
前幾篇文章我們已經實現了如何采用IDEA開發工具實現SpringBoot整合RabbitMQ的配置,其中有一個相當重要的配置類 RabbitmqConfig.java ,我們將在這裏創建隊列、交換機、路由及其綁定,下面我們就創建一個簡單的消息模型吧:DirectExchange+RoutingKey 。以下為創建隊列、交換機、路由及其綁定的相關信息。

1、首先是application.properties配置文件中配置的信息

mq.env=local
basic.info.mq.exchange.name=${mq.env}:basic:info:mq:exchange
basic.info.mq.routing.key.name=${mq.env}:basic:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:basic:info:mq:queue

2、RabbitmqConfig創建隊列、交換機、路由及其綁定

    //TODO:基本消息模型構建
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(env.getProperty("basic.info.mq.exchange.name"), true,false);
    }

    @Bean(name = "basicQueue")
    public Queue basicQueue(){
        return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
    }

    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
    }

3、當我們在上面創建好隊列、交換機、路由及其綁定後,我們可以先把整個項目跑起來,然後打開http://localhost:15672/ 訪問RabbitMQ後端控制臺,點擊 Queues、Exchanges 欄目,即可看到我們創建好的隊列、交換機。如下所示
技術分享圖片
技術分享圖片

4、現在可以說是萬事具備,只欠東風。創建好了隊列,自然是要來使用的,下面我就采用這條隊列來發送接收“簡單的字符串信息” 以及 “對象實體信息”!在這裏,我們在Controller執行發送邏輯,其中充當發送消息的組件是RabbitTemplate,充當消息的組件為Message。如下所示RabbitController.java

@RestController
    public class RabbitController {
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
    private static final String Prefix="rabbit";

    @Autowired
    private Environment env;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 發送簡單消息
     * @param message
     * @return
     */
    @RequestMapping(value = Prefix+"/simple/message/send",method = RequestMethod.GET)
    public BaseResponse sendSimpleMessage(@RequestParam String message){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待發送的消息: {} ",message);

            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("發送簡單消息發生異常: ",e.fillInStackTrace());
        }
        return response;
    }

    /**
     * 發送對象消息
     * @param user
     * @return
     */
    @RequestMapping(value = Prefix+"/object/message/send",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse sendObjectMessage(@RequestBody User user){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待發送的消息: {} ",user);

            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("發送對象消息發生異常: ",e.fillInStackTrace());
        }
        return response;
    }}

5、在上面可以看到我們的發送代碼邏輯其實並不復雜,其思路主要是來源於第一階段的介紹消息模型中的其中一種,如下圖所示。即我們是將消息發送到exchange,然後由於exchange與某個routingKey綁定路由到某個隊列queue,故而當消息到達exchange後,將自然而然的被路由到指定的queue中,等待被監聽消費。
技術分享圖片

6、下面我們需要創建一個listener用於監聽消費此隊列中的消息。代碼邏輯如下CommonListener.java

@Component
    public class CommonListener {

    private static final Logger log= LoggerFactory.getLogger(CommonListener.class);

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 監聽消費消息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收String
            String result=new String(message,"UTF-8");
            log.info("接收String消息: {} ",result);
        }catch (Exception e){
            log.error("監聽消費消息 發生異常: ",e.fillInStackTrace());
        }
    }}

7、緊接著,我們將整個項目跑起來,然後首先訪問 “http://127.0.0.1:9092/mq/rabbit/simple/message/send?message=簡單消息模型2” ,然後即可看到listener接收到改消息,可以在控制臺打印輸出!
技術分享圖片

8、然後我們改造一下 CommonListener的監聽消費代碼邏輯用於監聽消費對象實體的信息,如下

    /**
     * 監聽消費消息
     * @param message
     */
    @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收對象
            User user=objectMapper.readValue(message, User.class);
            log.info("接收對象消息: {} ",user);
        }catch (Exception e){
            log.error("監聽消費消息 發生異常: ",e.fillInStackTrace());
        }
    }}

然後再將整個項目跑起來,在postman如下發送一個對象實體信息
技術分享圖片

可以在listener打斷點啥的監聽期執行流程,然後即可看到listener監聽消費到了隊列中該對象實體消息,如下
技術分享圖片

至此我們的隊列、交換機、路由的創建沒啥問題了,而且我們在控制臺觀察會發現,CommonListener.java中的@RabbitListener 註解的方法確實以 不同於 主線程 的異步線程來執行的!正如上圖所看到的 那個就是線程Thread的ID。

9、上面的對象實體User的實體字段信息包含下面三個字段

    public class User implements Serializable{
    private Integer id;
    private String userName;
    private String name;

    public User(Integer id, String userName, String name) {
        this.id = id;
        this.userName = userName;
        this.name = name;
    }

    public User() {
    }

    public Integer getId() {
        return id;
    }

    //TODO:省略了getter/setter方法--也可以在類上面 @Data 加入 Lombok註解 - 當然啦前提是要加入Lombok依賴即可

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", userName=‘" + userName + ‘\‘‘ +
                ", name=‘" + name + ‘\‘‘ +
                ‘}‘;
    }}

10、至此,我們的SpringBoot整合RabbitMQ實戰發送接收消息 已經介紹完畢!在上面我們的RabbitTemplate充當了發送消息的組件,這個組件在SpringBoot搭建的微服務項目中至關重要,我們還可以在其中設置其他的相關屬性,包括我們之前設置好的 “消息回調”、“消息確認”等追溯的屬性!當然啦,在後面我們會發現 RabbitTemplate 發送的消息的方式有n多種,特別是在消息確認方面,寫法也是有多種,在後面我們會慢慢以代碼體現出來!但不管怎麽寫,我們還是要牢牢把握住上面最基本的寫法以及上面介紹的消息模型圖:因為那就是發送接收消息的核心所在!

實戰總結
SpringBoot搭建的微服務項目目前也是越來越流行,而消息中間件RabbitMQ應用也是越來越廣泛,本文我們介紹了如何在SpringBoot搭建的項目中利用SpringBoot提供的起步依賴、自動裝配等先天優勢來創建隊列、交換機、路由及其綁定並實現消息的發送監聽接收消費,其實這已經初步實現了業務服務模塊之間的解耦。在下節我們將用此來實戰企業級應用、微服務項目中常見的業務模塊:異步寫用戶操作日誌;異步發送郵件!

實戰結語
最近博主將SpringBoot整合RabbitMQ這一系列的文章進行了精要抽取並在gitchat上進行了分享,感興趣的童鞋可以關註並加入我發起的文章交流會!掃一掃下面的二維碼即可哦
技術分享圖片

另外,相關的文章也會同步發布在×××公眾號哦,感興趣的童鞋也可以關註關註。有相關問題可以加我QQ:1974544863進行交流或者加群進行討論:java開源技術交流-583522159
技術分享圖片

SpringBoot整合RabbitMQ之發送接收消息實戰