怎樣使用RabbitMQ傳遞物件
1.首先我們需要配置rabbit所需要的依賴
org.springframework.boot
spring-boot-starter-amqp
2.在springboot中的application.yml裡面配置下面的這些資料
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true # 訊息傳送到交換機確認機制,是否確認回撥
publisher-returns: true
devtools:
restart:
enabled: false
server:
port: 8088
3.訊息交換機配置 可以配置多個
public class ExchangeConfig {
/** * 1.定義direct exchange,繫結queueTest * 2.durable="true" rabbitmq重啟的時候不需要建立新的交換機 * 3.direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,訊息就被投送到相關的佇列 * fanout交換器中沒有路由鍵的概念,他會把訊息傳送到所有繫結在此交換器上面的佇列中。 * topic交換器你採用模糊匹配路由鍵的原則進行轉發訊息到佇列中 */ @Bean public DirectExchange directExchange(){ DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false); return directExchange; }
下面是配置多個佇列
public class QueueConfig {
@Bean public Queue firstQueue() { /** durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列 auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false exclusive 表示該訊息佇列是否只在當前connection生效,預設是false */ return new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false); } @Bean public Queue secondQueue() { return new Queue(RabbitMqConfig.QUEUE_NAME2,true,false,false); }
}
下面主要是佇列、交換機和工廠的連線
public class RabbitMqConfig {
/** 訊息交換機的名字*/
public static final String EXCHANGE = "exchangeTest";
/*對列名稱*/
public static final String QUEUE_NAME1 = "first-queue";
public static final String QUEUE_NAME2 = "second-queue";
/*
*
* key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
* 訊息將會轉發給queue引數指定的訊息佇列
*/
/** 佇列key1*/
public static final String ROUTINGKEY1 = "queue_one_key1";
/** 佇列key2*/
public static final String ROUTINGKEY2 = "queue_one_key2";
@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;
/**
* 連線工廠
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 將訊息佇列1和交換機進行繫結,指定佇列key1
*/
@Bean
public Binding binding_one() {
return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
}
/**
* 將訊息佇列2和交換機進行繫結,指定佇列key2
*/
@Bean
public Binding binding_two() {
return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
}
/**
* queue listener 觀察 監聽模式
* 當有訊息到達時會通知監聽在對應的佇列上的監聽物件
* @return
*/
/*@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
simpleMessageListenerContainer.setExposeListenerChannel(true);
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
return simpleMessageListenerContainer;
}*/
/**
* 自定義rabbit template用於資料的接收和傳送
* 可以設定訊息確認機制和回撥
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// template.setMessageConverter(); 可以自定義訊息轉換器 預設使用的JDK的,所以訊息物件需要實現Serializable
/**若使用confirm-callback或return-callback,
* 必須要配置publisherConfirms或publisherReturns為true
* 每個rabbitTemplate只能有一個confirm-callback和return-callback
*/
template.setConfirmCallback(msgSendConfirmCallBack());
/**
* 使用return-callback時必須設定mandatory為true,或者在配置中設定mandatory-expression的值為true,
* 可針對每次請求的訊息去確定’mandatory’的boolean值,
* 只能在提供’return -callback’時使用,與mandatory互斥
*/
template.setReturnCallback(msgSendReturnCallback());
template.setMandatory(true);
return template;
}
/* 關於 msgSendConfirmCallBack 和 msgSendReturnCallback 的回撥說明:
1.如果訊息沒有到exchange,則confirm回撥,ack=false
2.如果訊息到達exchange,則confirm回撥,ack=true
3.exchange到queue成功,則不回撥return
4.exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
*/
/**
* 訊息確認機制
* Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,
* 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。
* 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)
* 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
* @return
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
return new MsgSendConfirmCallBack();
}
@Bean
public MsgSendReturnCallback msgSendReturnCallback(){
return new MsgSendReturnCallback();
}
}
當生產者傳送訊息我們需要回調接收到的訊息
訊息傳送到交換機確認機制
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("MsgSendConfirmCallBack , 回撥的id:" + correlationData);
if (ack) {
System.out.println("訊息接收成功");
} else {
System.out.println("訊息接收失敗");
}
}
}
生產者傳送的訊息,需要注意的是,如果傳送的是物件需要使用到mapper.writeValueAsString進行系列化
(序列化就是將一個物件的狀態(各個屬性量)儲存起來,然後在適當的時候再獲得。序列化分為兩大部分:序列化和反序列化。序列化是這個過程的第一部分,將資料分解成位元組流,以便儲存在檔案中或在網路上傳輸。反序列化就是開啟位元組流並重構物件。物件序列化不僅要將基本資料型別轉換成位元組表示,有時還要恢復資料。恢復資料要求有恢復資料的物件例項。)
public class FirstSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 傳送訊息
* @param uuid
* @param message 訊息
*/
public void send(Student uuid, Object message) throws IOException, TimeoutException{
/*CorrelationData correlationId = new CorrelationData(uuid);*/
/**
* RabbitMqConfig.EXCHANGE 指定訊息交換機
* RabbitMqConfig.ROUTINGKEY2 指定佇列key2
*/
ObjectMapper mapper=new ObjectMapper();
String messaged=mapper.writeValueAsString(uuid);
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1,
messaged.getBytes(), new CorrelationData(1+""));
}
}
消費者接收到的訊息
public class FirstConsumer {
/**
* queues 指定從哪個佇列(queue)訂閱訊息
* @param message
*/
@RabbitListener(queues = {"first-queue"})
public void handleMessage(Message message)throws IOException, TimeoutException,ShutdownSignalException{
ObjectMapper mapper=new ObjectMapper();
String messaged=new String(message.getBody());
Student student=mapper.readValue(messaged.getBytes("utf-8"),Student.class);
System.out.println("FirstConsumer {} handleMessage :"+student.getUsername()+student.getPassword());
}
}
傳送一個使用者物件的資料
public class SendController {
@Autowired
private FirstSender firstSender;
@GetMapping("/send")
public String send(String message) throws Exception,TimeoutException{
String uuid = UUID.randomUUID().toString();
Student student=new Student();
student.setUsername("fsfd");
student.setPassword("xixix");
firstSender.send(student,message);
return uuid;
}
}