1. 程式人生 > >怎樣使用RabbitMQ傳遞物件

怎樣使用RabbitMQ傳遞物件

1.首先我們需要配置rabbit所需要的依賴

org.springframework.boot
spring-boot-starter-amqp

2.在springboot中的application.yml裡面配置下面的這些資料
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true # 訊息傳送到交換機確認機制,是否確認回撥
publisher-returns: true
devtools:
restart:
enabled: false
server:
port: 8088

3.訊息交換機配置 可以配置多個
public class ExchangeConfig {

/**
 *   1.定義direct exchange,繫結queueTest
 *   2.durable="true" rabbitmq重啟的時候不需要建立新的交換機
 *   3.direct交換器相對來說比較簡單,匹配規則為:如果路由鍵匹配,訊息就被投送到相關的佇列
 *     fanout交換器中沒有路由鍵的概念,他會把訊息傳送到所有繫結在此交換器上面的佇列中。
 *     topic交換器你採用模糊匹配路由鍵的原則進行轉發訊息到佇列中
 */
@Bean
public DirectExchange directExchange(){
    DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false);
    return directExchange;
}

下面是配置多個佇列
public class QueueConfig {

@Bean
public Queue firstQueue() {
    /**
     durable="true" 持久化 rabbitmq重啟的時候不需要建立新的佇列
     auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false
     exclusive  表示該訊息佇列是否只在當前connection生效,預設是false
     */
    return new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false);
}

@Bean
public Queue secondQueue() {
    return new Queue(RabbitMqConfig.QUEUE_NAME2,true,false,false);
}

}
下面主要是佇列、交換機和工廠的連線
public class RabbitMqConfig {

/** 訊息交換機的名字*/
public static final String EXCHANGE = "exchangeTest";
/*對列名稱*/
public static final String QUEUE_NAME1 = "first-queue";
public static final String QUEUE_NAME2 = "second-queue";

/*
*
* key: queue在該direct-exchange中的key值,當訊息傳送給direct-exchange中指定key為設定值時,
*   訊息將會轉發給queue引數指定的訊息佇列
*/
/** 佇列key1*/
public static final String ROUTINGKEY1 = "queue_one_key1";
/** 佇列key2*/
public static final String ROUTINGKEY2 = "queue_one_key2";

@Autowired
private QueueConfig queueConfig;
@Autowired
private ExchangeConfig exchangeConfig;

/**
 * 連線工廠
 */
@Autowired
private ConnectionFactory connectionFactory;

/**
 * 將訊息佇列1和交換機進行繫結,指定佇列key1
 */
@Bean
public Binding binding_one() {
    return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
}

/**
 * 將訊息佇列2和交換機進行繫結,指定佇列key2
 */
@Bean
public Binding binding_two() {
    return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
}

/**
 * queue listener  觀察 監聽模式
 * 當有訊息到達時會通知監聽在對應的佇列上的監聽物件
 * @return
 */
/*@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
    simpleMessageListenerContainer.setExposeListenerChannel(true);
    simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
    simpleMessageListenerContainer.setConcurrentConsumers(1);
    simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
    return simpleMessageListenerContainer;
}*/

/**
 * 自定義rabbit template用於資料的接收和傳送
 * 可以設定訊息確認機制和回撥
 * @return
 */
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // template.setMessageConverter(); 可以自定義訊息轉換器  預設使用的JDK的,所以訊息物件需要實現Serializable

    /**若使用confirm-callback或return-callback,
     * 必須要配置publisherConfirms或publisherReturns為true
     * 每個rabbitTemplate只能有一個confirm-callback和return-callback
     */
    template.setConfirmCallback(msgSendConfirmCallBack());

    /**
     * 使用return-callback時必須設定mandatory為true,或者在配置中設定mandatory-expression的值為true,
     * 可針對每次請求的訊息去確定’mandatory’的boolean值,
     * 只能在提供’return -callback’時使用,與mandatory互斥
     */
    template.setReturnCallback(msgSendReturnCallback());
    template.setMandatory(true);
    return template;
}

/*  關於 msgSendConfirmCallBack 和 msgSendReturnCallback 的回撥說明:
    1.如果訊息沒有到exchange,則confirm回撥,ack=false
    2.如果訊息到達exchange,則confirm回撥,ack=true
    3.exchange到queue成功,則不回撥return
    4.exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了)
*/

/**
 * 訊息確認機制
 * Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些訊息被broker處理,
 * 哪些可能因為broker宕掉或者網路失敗的情況而重新發布。
 * 確認並且保證訊息被送達,提供了兩種方式:釋出確認和事務。(兩者不可同時使用)
 * 在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
 * @return
 */
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
    return new MsgSendConfirmCallBack();
}

@Bean
public MsgSendReturnCallback msgSendReturnCallback(){
    return new MsgSendReturnCallback();
}

}

當生產者傳送訊息我們需要回調接收到的訊息
訊息傳送到交換機確認機制
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    System.out.println("MsgSendConfirmCallBack  , 回撥的id:" + correlationData);
    if (ack) {
        System.out.println("訊息接收成功");
    } else {
        System.out.println("訊息接收失敗");
    }
}

}

生產者傳送的訊息,需要注意的是,如果傳送的是物件需要使用到mapper.writeValueAsString進行系列化
(序列化就是將一個物件的狀態(各個屬性量)儲存起來,然後在適當的時候再獲得。序列化分為兩大部分:序列化和反序列化。序列化是這個過程的第一部分,將資料分解成位元組流,以便儲存在檔案中或在網路上傳輸。反序列化就是開啟位元組流並重構物件。物件序列化不僅要將基本資料型別轉換成位元組表示,有時還要恢復資料。恢復資料要求有恢復資料的物件例項。)
public class FirstSender {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * 傳送訊息
 * @param uuid
 * @param message  訊息
 */
public void send(Student uuid, Object message) throws IOException, TimeoutException{
    /*CorrelationData correlationId = new CorrelationData(uuid);*/
    /**
     * RabbitMqConfig.EXCHANGE  指定訊息交換機
     * RabbitMqConfig.ROUTINGKEY2  指定佇列key2
     */
    ObjectMapper mapper=new ObjectMapper();
    String messaged=mapper.writeValueAsString(uuid);

    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1,
            messaged.getBytes(), new CorrelationData(1+""));
}

}
消費者接收到的訊息
public class FirstConsumer {

/**
 * queues  指定從哪個佇列(queue)訂閱訊息
 * @param message
 */
@RabbitListener(queues = {"first-queue"})
public void handleMessage(Message message)throws IOException, TimeoutException,ShutdownSignalException{
    ObjectMapper mapper=new ObjectMapper();
    String messaged=new String(message.getBody());
    Student student=mapper.readValue(messaged.getBytes("utf-8"),Student.class);
    System.out.println("FirstConsumer {} handleMessage :"+student.getUsername()+student.getPassword());
}

}

傳送一個使用者物件的資料
public class SendController {

@Autowired
private FirstSender firstSender;

@GetMapping("/send")
public String send(String message) throws  Exception,TimeoutException{
    String uuid = UUID.randomUUID().toString();
    Student student=new Student();
    student.setUsername("fsfd");
    student.setPassword("xixix");
    firstSender.send(student,message);
    return uuid;
}

}