1. 程式人生 > >RabbitMq學習筆記002---RabbitMq在SpringBoot中的應用_配置_使用_並且設定優先順序

RabbitMq學習筆記002---RabbitMq在SpringBoot中的應用_配置_使用_並且設定優先順序

首先新建一個SpringBoot的工程,空的就可以:
可以用idea
也可以用eclipse,也可以用sts,這類工具都可以
E:\StsWorkSpace\spring-boot-rabbitmq-test

然後看配置:
首先在application.properties中寫入rabbitmq的配置
E:\StsWorkSpace\spring-boot-rabbitmq-test
\src\main\resources\application.properties

#spring.application.name=spring-boot-rabbitmq

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest


這裡注意:埠號是5672,本地在瀏覽器訪問rabbitmq伺服器的時候是:
http://localhost:15672/#/queues/%2F/direct 這個地址,但是配置的時候,用5672,用15672是會連線不上的

然後再寫個配置類:
E:\StsWorkSpace\spring-boot-rabbitmq-test\src\main\java\io
\credream\rabbitmq\config\RabbitDirectConfig.java
package io.credream.rabbitmq.config;
import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 描述: 配置預設的交換機模式
 *
 * Direct Exchange是RabbitMQ預設的交換機模式,也是最簡單的模式,根據key全文匹配去尋找佇列。
 *
 * @author lidewei
 * @create 2017-10-25 0:09
 **/
@Configuration
public class RabbitDirectConfig {
//注意這裡是給交換機配置了10個優先順序,數字越大越優先,優先順序最大可以設定255,官方建議設定0到10
//https://www.rabbitmq.com/priority.html這裡有關於優先順序的設定,說明
    private static final int  MAX_PRIORITY=10;
//2.這裡新建一個queue,名字可以自己起名,注意這裡的hello就是routekey,可以通過
//它來找到這個佇列
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
//3.第二個queue
    @Bean
    public Queue directQueue() {
        return new Queue("direct");
    }

//4.這裡配置交換機,模式,預設用的directExchange,還有其他模式,複雜一些可以查閱
    //-------------------配置預設的交換機模式,可以不需要配置以下-----------------------------------
    @Bean
    DirectExchange directExchange() {
         Map<String,Object> args = new HashMap<String, Object>();  
         args.put("x-max-priority",MAX_PRIORITY); //佇列的屬性引數 有10個優先級別
             //5.這裡注意,就是通過這個方法來給交換機繫結優先順序的args,這個是引數列表,裡面有優先順序
    //可以看到他說 core as of version 3.5.0. ,3.5.0之後的版本才支援,優先順序設定
         return new DirectExchange("directExchange",true,false,args);
    }
//
  //6.這裡給交換機繫結一個佇列的key "direct",當訊息匹配到就會放到這個佇列中
    @Bean
    Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("direct");
    }
    //7.由於這裡咱們建立了兩個佇列,所以。都需要加入到交換機中,這裡做了兩次繫結
    @Bean
    Binding bindingExchangeHelloQueue(Queue helloQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(helloQueue).to(directExchange).with("hello");
    }
  // 推薦使用 helloQueue() 方法寫法,這種方式在 Direct Exchange 模式 多此一舉,沒必要這樣寫
    //---------------------------------------------------------------------------------------------
}

到這裡:
可以看到使用的流程是
a 先配置rabbitmq 配置檔案
b 寫配置類,生成佇列queue,然後,寫交換機,然後把queue,放到交換機中去

好,接下來,寫接收者:
E:\StsWorkSpace\spring-boot-rabbitmq-test\src\main\java\io\
credream\rabbitmq\direct\DirectReceiver.java
package io.credream.rabbitmq.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 描述: 接收者
 * @author: lidewei
 * @create: 2017/10/25 0:49
 */
@Component //2.註冊到spring中
@RabbitListener(queues = "direct") //要監聽哪個佇列,當然這裡可以傳入一個string陣列
//可以去看原始碼:String[] queues() default {};

public class DirectReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 DirectReceiver," + message);
        //1.這樣寫了以後,當對應的佇列中有訊息的時候,就會自動捕捉,接受
try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

然後:
SpringBoot啟動類
E:\StsWorkSpace\spring-boot-rabbitmq-test\src\main\java\io\credream\rabbitmq\run\Startup.java
package io.credream.rabbitmq.run;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

/**
 * 描述: 啟動服務
 *
 * @author: lidewei
 * @create: 2017/10/23 14:14
 */
@SpringBootApplication
@ComponentScan(value = {"io.credream.rabbitmq"})
public class Startup {

    public static void main(String[] args) {
        SpringApplication.run(Startup.class, args);
    }
}


然後再去編寫測試類:
E:\StsWorkSpace\spring-boot-rabbitmq-test\src\test\java\io\
credream\rabbitmq\test\RabbitDirectTest.java
package io.credream.rabbitmq.test;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import io.credream.rabbitmq.run.Startup;

/**
 * 描述: 預設的交換機模式
 *
 * @author: yanpenglei
 * @create: 2017/10/25 1:03
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitDirectTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendHelloTest() {

        String context = "此訊息在,預設的交換機模式佇列下,有 helloReceiver 可以收到";

        String routeKey = "hello";
        String exchange = "directExchange";
        context = "routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendHelloTest : " + context);
for(int i=0;i<100;i++) {
//      try {
//            Thread.sleep(10);
//        } catch (InterruptedException e) {
//            // TODO Auto-generated catch block
//            e.printStackTrace();
//        }
      this.rabbitTemplate.convertAndSend(exchange, routeKey, context, new MessagePostProcessor() {
            
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(10);
                return message;
            }
        });
}

        
    }

    @Test
    public void sendDirectTest() {

        String context = "此訊息在,預設的交換機模式佇列下,有 DirectReceiver 可以收到";

        String routeKey = "direct";

        String exchange = "directExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendDirectTest : " + context);

        // 推薦使用 sendHello() 方法寫法,這種方式在 Direct Exchange 多此一舉,沒必要這樣寫
        //this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
       //2.通過這種方式設定rabbitmq訊息優先順序
        for(int i=0;i<100;i++) {
//              try {
//                  Thread.sleep(10);
//              } catch (InterruptedException e) {
//                  // TODO Auto-generated catch block
//                  e.printStackTrace();
//              }
        this.rabbitTemplate.convertAndSend(exchange, routeKey, context, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setPriority(1);
            return message;
        }
    });
        }
    }

}


這裡寫了兩個測試類:當然,使用一個就可以
---------------------