1. 程式人生 > 其它 >RabbitMQ實現JSON、Map格式資料的傳送與接收

RabbitMQ實現JSON、Map格式資料的傳送與接收

技術標籤:RabbitMQ我の原創rabbitmq

RabbitMQ 是目前非常熱門的一款訊息中介軟體,不管是網際網路行業還是傳統行業都在大量地使用。RabbitMQ 憑藉其高可靠、易擴充套件、高可用及豐富的功能特性收到越來越多企業的青睞。在實現的專案開發中,經常使用Json、Map格式資料。下面將介紹RabbitMQ實現Json、Map格式資料的傳送與接收。

(1)建立SpringBoot 專案,並整合 RabbitMQ框架

在pom.xml配置資訊檔案中,新增相關依賴檔案:

<!-- AMQP客戶端 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml 配置檔案中配置 RabbitMQ 服務:

spring:
  # 專案名稱
  application:
    name: rabbitmq-provider
  # RabbitMQ服務配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 訊息確認(ACK)
    publisher-confirm-type: correlated #確認訊息已傳送到交換機(Exchange)
    publisher-returns: true #確認訊息已傳送到佇列(Queue)

(2)RabbitMQ 配置類

在專案中,建立配置類,配置訊息確認,Json轉換器,佇列名稱等,並將佇列交由 IoC 管理。程式碼如下:

package com.pjb.config;

import com.pjb.receiver.AckReceiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置類
 * @author pan_junbiao
 **/
@Configuration
public class RabbitMqConfig
{
    public static final String DIRECT_QUEUE_NAME = "direct_queue_name"; //佇列名稱
    public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; //交換器名稱
    public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由鍵

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
    {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        
        //設定Json轉換器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        
        return rabbitTemplate;
    }

    /**
     * Json轉換器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 佇列
     */
    @Bean
    public Queue directQueue()
    {
        return new Queue(DIRECT_QUEUE_NAME, true, false, false, null);
    }

    /**
     * Direct交換器
     */
    @Bean
    public DirectExchange directExchange()
    {
        return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
    }

    /**
     * 繫結
     */
    @Bean
    Binding bindingDirect(DirectExchange directExchange, Queue directQueue)
    {
        //將佇列和交換機繫結, 並設定用於匹配鍵:routingKey
        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    /********************配置客戶端訊息確認Ack********************/
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private AckReceiver ackReceiver;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer()
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);

        // RabbitMQ預設是自動確認,這裡改為手動確認訊息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //設定一個佇列
        container.setQueueNames(DIRECT_QUEUE_NAME);

        container.setMessageListener(ackReceiver);

        return container;
    }
}

1、JSON格式資料的傳送與接收

(1)建立實體類(entity層)

在 com.pjb.entity 包中,建立UserInfo類(使用者資訊實體類)。

package com.pjb.entity;

/**
 * 使用者資訊實體類
 * @author pan_junbiao
 **/
public class UserInfo
{
    private int userId; //使用者編號
    private String userName; //使用者姓名
    private String blogUrl; //部落格地址
    private String blogRemark; //部落格資訊

    //省略getter與setter方法...
}

(2)建立傳送者(sender層)

在 com.pjb.sender 包中,建立傳送者,利用rabbitTemplate.convertAndSend() 方法傳送訊息。

package com.pjb.sender;

import com.pjb.config.RabbitMqConfig;
import com.pjb.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * 傳送JSON資料
 * @author pan_junbiao
 **/
@SpringBootTest
public class JsonSender
{
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sender() throws AmqpException
    {
        //建立使用者資訊
        UserInfo userInfo = new UserInfo();
        userInfo.setUserId(1);
        userInfo.setUserName("pan_junbiao的部落格");
        userInfo.setBlogUrl("https://blog.csdn.net/pan_junbiao");
        userInfo.setBlogRemark("您好,歡迎訪問 pan_junbiao的部落格");
        
        /**
         * 傳送訊息,引數說明:
         * String exchange:交換器名稱。
         * String routingKey:路由鍵。
         * Object object:傳送內容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userInfo);
        System.out.println("訊息傳送成功!");
    }
}

(3)建立接收者(receiver層)

方式一:使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。

在 com.pjb.receiver 包中,建立建立接收者,使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。在方法的引數中, RabbitMQ會自動將JSON引數轉換為實體物件類。

注意,傳送者和接收者的 Queue 名稱必須一致,否則不能接收訊息。

package com.pjb.receiver;

import com.pjb.entity.UserInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.pjb.config.RabbitMqConfig;

import java.util.Map;

/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
@RabbitListener(queues=RabbitMqConfig.DIRECT_QUEUE_NAME)
public class JsonReceiver
{
    @RabbitHandler
    public void process(UserInfo userInfo)
    {
        System.out.println("接收者收到JSON格式訊息:");
        System.out.println("使用者編號:" + userInfo.getUserId());
        System.out.println("使用者名稱稱:" + userInfo.getUserName());
        System.out.println("部落格地址:" + userInfo.getBlogUrl());
        System.out.println("部落格資訊:" + userInfo.getBlogRemark());
    }
}

方式二:使用RabbitMQ訊息確認機制(ACK)

如果專案中使用了RabbitMQ訊息確認機制(ACK),則獲取Json格式資料方法如下:

package com.pjb.receiver;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.pjb.entity.UserInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 * Ack接收者
 * @author pan_junbiao
 **/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //將JSON格式資料轉換為實體物件
            ObjectMapper mapper = new ObjectMapper();
            UserInfo userInfo = mapper.readValue(message.getBody(), UserInfo.class);

            System.out.println("接收者收到JSON格式訊息:");
            System.out.println("使用者編號:" + userInfo.getUserId());
            System.out.println("使用者名稱稱:" + userInfo.getUserName());
            System.out.println("部落格地址:" + userInfo.getBlogUrl());
            System.out.println("部落格資訊:" + userInfo.getBlogRemark());

            //確認訊息
            channel.basicAck(deliveryTag, true);
        }
        catch (Exception e)
        {
            e.printStackTrace();

            //拒絕訊息
            channel.basicReject(deliveryTag, true);
        }
    }
}

執行結果:

2、Map格式資料的傳送與接收

(1)建立傳送者(sender層)

在 com.pjb.sender 包中,建立傳送者,利用rabbitTemplate.convertAndSend() 方法傳送訊息。

package com.pjb.sender;

import com.pjb.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

/**
 * 傳送Map資料
 * @author pan_junbiao
 **/
@SpringBootTest
public class MapSender
{
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sender() throws AmqpException
    {
        //建立使用者資訊Map
        Map<String, Object> userMap = new HashMap<>();
        userMap.put("userId", "1");
        userMap.put("userName", "pan_junbiao的部落格");
        userMap.put("blogUrl", "https://blog.csdn.net/pan_junbiao");
        userMap.put("userRemark", "您好,歡迎訪問 pan_junbiao的部落格");

        /**
         * 傳送訊息,引數說明:
         * String exchange:交換器名稱。
         * String routingKey:路由鍵。
         * Object object:傳送內容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
        System.out.println("訊息傳送成功!");
    }
}

(2)建立接收者(receiver層)

方式一:使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。

在 com.pjb.receiver 包中,建立建立接收者,使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。在方法的引數中, RabbitMQ會自動封裝Map格式資料。

注意,傳送者和接收者的 Queue 名稱必須一致,否則不能接收訊息。

package com.pjb.receiver;

import com.pjb.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
@RabbitListener(queues= RabbitMqConfig.DIRECT_QUEUE_NAME)
public class MapReceiver
{
    @RabbitHandler
    public void process(Map message)
    {
        System.out.println("接收者收到Map訊息:");
        System.out.println("使用者編號:" + message.get("userId"));
        System.out.println("使用者名稱稱:" + message.get("userName"));
        System.out.println("部落格地址:" + message.get("blogUrl"));
        System.out.println("部落格資訊:" + message.get("userRemark"));
    }
}

方式二:使用RabbitMQ訊息確認機制(ACK)

如果專案中使用了RabbitMQ訊息確認機制(ACK),則獲取Map格式資料方法如下:

package com.pjb.receiver;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Ack接收者
 * @author pan_junbiao
 **/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //將JSON格式資料轉換為Map物件
            ObjectMapper mapper = new ObjectMapper();
            JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
            Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);

            System.out.println("接收者收到Map格式訊息:");
            System.out.println("使用者編號:" + resultMap.get("userId"));
            System.out.println("使用者名稱稱:" + resultMap.get("userName"));
            System.out.println("部落格地址:" + resultMap.get("blogUrl"));
            System.out.println("部落格資訊:" + resultMap.get("userRemark"));

            //確認訊息
            channel.basicAck(deliveryTag, true);
        }
        catch (Exception e)
        {
            e.printStackTrace();

            //拒絕訊息
            channel.basicReject(deliveryTag, true);
        }
    }
}

執行結果: