RabbitMQ實現JSON、Map格式資料的傳送與接收
RabbitMQ 是目前非常熱門的一款訊息中介軟體,不管是網際網路行業還是傳統行業都在大量地使用。RabbitMQ 憑藉其高可靠、易擴充套件、高可用及豐富的功能特性收到越來越多企業的青睞。在實現的專案開發中,經常使用Json、Map格式資料。下面將介紹RabbitMQ實現Json、Map格式資料的傳送與接收。
(1)建立SpringBoot 專案,並整合 RabbitMQ框架
在pom.xml配置資訊檔案中,新增相關依賴檔案:
<!-- AMQP客戶端 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml 配置檔案中配置 RabbitMQ 服務:
spring: # 專案名稱 application: name: rabbitmq-provider # RabbitMQ服務配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 訊息確認(ACK) publisher-confirm-type: correlated #確認訊息已傳送到交換機(Exchange) publisher-returns: true #確認訊息已傳送到佇列(Queue)
(2)RabbitMQ 配置類
在專案中,建立配置類,配置訊息確認,Json轉換器,佇列名稱等,並將佇列交由 IoC 管理。程式碼如下:
package com.pjb.config; import com.pjb.receiver.AckReceiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ配置類 * @author pan_junbiao **/ @Configuration public class RabbitMqConfig { public static final String DIRECT_QUEUE_NAME = "direct_queue_name"; //佇列名稱 public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; //交換器名稱 public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由鍵 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設定Json轉換器 rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } /** * Json轉換器 */ @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } /** * 佇列 */ @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE_NAME, true, false, false, null); } /** * Direct交換器 */ @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false); } /** * 繫結 */ @Bean Binding bindingDirect(DirectExchange directExchange, Queue directQueue) { //將佇列和交換機繫結, 並設定用於匹配鍵:routingKey return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY); } /********************配置客戶端訊息確認Ack********************/ @Autowired private CachingConnectionFactory connectionFactory; @Autowired private AckReceiver ackReceiver; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); // RabbitMQ預設是自動確認,這裡改為手動確認訊息 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定一個佇列 container.setQueueNames(DIRECT_QUEUE_NAME); container.setMessageListener(ackReceiver); return container; } }
1、JSON格式資料的傳送與接收
(1)建立實體類(entity層)
在 com.pjb.entity 包中,建立UserInfo類(使用者資訊實體類)。
package com.pjb.entity;
/**
* 使用者資訊實體類
* @author pan_junbiao
**/
public class UserInfo
{
private int userId; //使用者編號
private String userName; //使用者姓名
private String blogUrl; //部落格地址
private String blogRemark; //部落格資訊
//省略getter與setter方法...
}
(2)建立傳送者(sender層)
在 com.pjb.sender 包中,建立傳送者,利用rabbitTemplate.convertAndSend() 方法傳送訊息。
package com.pjb.sender;
import com.pjb.config.RabbitMqConfig;
import com.pjb.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 傳送JSON資料
* @author pan_junbiao
**/
@SpringBootTest
public class JsonSender
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sender() throws AmqpException
{
//建立使用者資訊
UserInfo userInfo = new UserInfo();
userInfo.setUserId(1);
userInfo.setUserName("pan_junbiao的部落格");
userInfo.setBlogUrl("https://blog.csdn.net/pan_junbiao");
userInfo.setBlogRemark("您好,歡迎訪問 pan_junbiao的部落格");
/**
* 傳送訊息,引數說明:
* String exchange:交換器名稱。
* String routingKey:路由鍵。
* Object object:傳送內容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userInfo);
System.out.println("訊息傳送成功!");
}
}
(3)建立接收者(receiver層)
方式一:使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。
在 com.pjb.receiver 包中,建立建立接收者,使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。在方法的引數中, RabbitMQ會自動將JSON引數轉換為實體物件類。
注意,傳送者和接收者的 Queue 名稱必須一致,否則不能接收訊息。
package com.pjb.receiver;
import com.pjb.entity.UserInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.pjb.config.RabbitMqConfig;
import java.util.Map;
/**
* 接收者
* @author pan_junbiao
**/
@Component
@RabbitListener(queues=RabbitMqConfig.DIRECT_QUEUE_NAME)
public class JsonReceiver
{
@RabbitHandler
public void process(UserInfo userInfo)
{
System.out.println("接收者收到JSON格式訊息:");
System.out.println("使用者編號:" + userInfo.getUserId());
System.out.println("使用者名稱稱:" + userInfo.getUserName());
System.out.println("部落格地址:" + userInfo.getBlogUrl());
System.out.println("部落格資訊:" + userInfo.getBlogRemark());
}
}
方式二:使用RabbitMQ訊息確認機制(ACK)
如果專案中使用了RabbitMQ訊息確認機制(ACK),則獲取Json格式資料方法如下:
package com.pjb.receiver;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pjb.entity.UserInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* Ack接收者
* @author pan_junbiao
**/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//將JSON格式資料轉換為實體物件
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), UserInfo.class);
System.out.println("接收者收到JSON格式訊息:");
System.out.println("使用者編號:" + userInfo.getUserId());
System.out.println("使用者名稱稱:" + userInfo.getUserName());
System.out.println("部落格地址:" + userInfo.getBlogUrl());
System.out.println("部落格資訊:" + userInfo.getBlogRemark());
//確認訊息
channel.basicAck(deliveryTag, true);
}
catch (Exception e)
{
e.printStackTrace();
//拒絕訊息
channel.basicReject(deliveryTag, true);
}
}
}
執行結果:
2、Map格式資料的傳送與接收
(1)建立傳送者(sender層)
在 com.pjb.sender 包中,建立傳送者,利用rabbitTemplate.convertAndSend() 方法傳送訊息。
package com.pjb.sender;
import com.pjb.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
/**
* 傳送Map資料
* @author pan_junbiao
**/
@SpringBootTest
public class MapSender
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sender() throws AmqpException
{
//建立使用者資訊Map
Map<String, Object> userMap = new HashMap<>();
userMap.put("userId", "1");
userMap.put("userName", "pan_junbiao的部落格");
userMap.put("blogUrl", "https://blog.csdn.net/pan_junbiao");
userMap.put("userRemark", "您好,歡迎訪問 pan_junbiao的部落格");
/**
* 傳送訊息,引數說明:
* String exchange:交換器名稱。
* String routingKey:路由鍵。
* Object object:傳送內容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
System.out.println("訊息傳送成功!");
}
}
(2)建立接收者(receiver層)
方式一:使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。
在 com.pjb.receiver 包中,建立建立接收者,使用傳統的@RabbitListener、@RabbitHandler註解實現訊息的接收。在方法的引數中, RabbitMQ會自動封裝Map格式資料。
注意,傳送者和接收者的 Queue 名稱必須一致,否則不能接收訊息。
package com.pjb.receiver;
import com.pjb.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 接收者
* @author pan_junbiao
**/
@Component
@RabbitListener(queues= RabbitMqConfig.DIRECT_QUEUE_NAME)
public class MapReceiver
{
@RabbitHandler
public void process(Map message)
{
System.out.println("接收者收到Map訊息:");
System.out.println("使用者編號:" + message.get("userId"));
System.out.println("使用者名稱稱:" + message.get("userName"));
System.out.println("部落格地址:" + message.get("blogUrl"));
System.out.println("部落格資訊:" + message.get("userRemark"));
}
}
方式二:使用RabbitMQ訊息確認機制(ACK)
如果專案中使用了RabbitMQ訊息確認機制(ACK),則獲取Map格式資料方法如下:
package com.pjb.receiver;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Ack接收者
* @author pan_junbiao
**/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//將JSON格式資料轉換為Map物件
ObjectMapper mapper = new ObjectMapper();
JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);
System.out.println("接收者收到Map格式訊息:");
System.out.println("使用者編號:" + resultMap.get("userId"));
System.out.println("使用者名稱稱:" + resultMap.get("userName"));
System.out.println("部落格地址:" + resultMap.get("blogUrl"));
System.out.println("部落格資訊:" + resultMap.get("userRemark"));
//確認訊息
channel.basicAck(deliveryTag, true);
}
catch (Exception e)
{
e.printStackTrace();
//拒絕訊息
channel.basicReject(deliveryTag, true);
}
}
}
執行結果: