1. 程式人生 > >Spring boot整合Rabbit MQ使用初體驗

Spring boot整合Rabbit MQ使用初體驗

Spring boot整合Rabbit MQ使用初體驗

1.rabbit mq基本特性

首先介紹一下rabbitMQ的幾個特性

Asynchronous Messaging
Supports multiple messaging protocols, message queuing, delivery acknowledgement, flexible routing to queues, multiple exchange type.

非同步訊息

支援多種訊息傳遞協議,訊息排隊,傳遞確認,靈活路由規則,多種交換型別。這些應該是rabbitmq最核心的特性了。

Developer Experience

Deploy with BOSH, Chef, Docker and Puppet. Develop cross-language messaging with favorite programming languages such as: Java, .NET, PHP, Python, JavaScript, Ruby, Go, and many others.

部署體驗?

與BOSH,Chef,Docker和Puppet一起部署。使用喜歡的程式語言來開發跨語言訊息傳遞,例如Java,.NET,PHP,Python,JavaScript,Ruby,Go等。

Distributed Deployment

Deploy as clusters for high availability and throughput; federate across multiple availability zones and regions.

分散式部署

部署為叢集以實現高可用性和吞吐量;跨多個可用區域和區域聯合。

Enterprise & Cloud Ready

Pluggable authentication, authorisation, supports TLS and LDAP. Lightweight and easy to deploy in public and private clouds.

企業和雲就緒

可插拔身份驗證,授權,支援TLS和LDAP。輕巧且易於在公共和私有云中進行部署。

Tools & Plugins

Diverse array of tools and plugins supporting continuous integration, operational metrics, and integration to other enterprise systems. Flexible plug-in approach for extending RabbitMQ functionality.

工具&外掛

工具和外掛的種類繁多,支援持續整合,運營指標以及與其他企業系統的整合。靈活的外掛方法,用於擴充套件RabbitMQ功能。

Management & Monitoring

HTTP-API, command line tool, and UI for managing and monitoring RabbitMQ.

管理和監控

HTTP-API支援,命令列工具,管理和監控介面。

2.rabbit mq核心概念

①Message

訊息,訊息就是資料的載體,由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵,也就是訊息是如何分發給佇列的),priority(相對於其它訊息的優先權),delivery-mode(指定是否需要持久化儲存)

②Publisher

訊息的生產者,向交換機發布訊息的客戶端應用程式。

③Exchange

交換機用來接收生產者傳送的訊息並將這些訊息按照路由規則或者交換機型別路由到指定的佇列。交換機有4種類型:direct(預設),fanout,topic,以及headers,這四種類型支援不同的路由策略。

④Queue

訊息佇列,用於儲存訊息直到傳送給消費者,是訊息的容器。一個訊息可以存入一個或多個佇列,一直到消費者消費這個訊息,才會從佇列中刪除。

⑤Binding

繫結,指定交換機和佇列的繫結規則,可以理解為一個過濾器,當路由鍵符合這個繫結規則時,就會將訊息傳送給佇列。交換機和佇列之間的繫結可以是多對多的關係

⑥Connection

一個TCP連線

⑦Channel

通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。

⑧Consumer

訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。

⑨Virtual Host

虛擬主機,表示一批交換機、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。

⑩Broker

表示訊息佇列伺服器實體。

更詳細的說明請參考官方文件:https://www.rabbitmq.com/tutorials/amqp-concepts.html

3.交換機型別和訊息路由

  • Direct Exchange

訊息中的路由鍵(routing key)如果和 Binding中的 binding key 一致,交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列繫結到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的訊息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。

  • Fanout Exchange

每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵【路由鍵被忽略】,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。

  • Topic Exchange

topic 交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號“”。

注意#匹配0個或者多個單詞,*匹配一個單詞

4.開始使用

我們先看spring boot的官方文件是怎麼說的吧。

首先,新增這些配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /test

rabbitmq預設使用者名稱密碼為guest:guest

先上配置類,RabbitTemplate使用自動配置好的,自動注入進來就可以了,我們還需要配置一個RabbitAdmin物件,RabbitAdmin有兩個構造方法

/**
* Construct an instance using the provided {@link ConnectionFactory}.
* @param connectionFactory the connection factory - must not be null.
*/
public RabbitAdmin(ConnectionFactory connectionFactory) {
    Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
    this.connectionFactory = connectionFactory;
    this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}

/**
* Construct an instance using the provided {@link RabbitTemplate}. Use this
* constructor when, for example, you want the admin operations to be performed within
* the scope of the provided template's {@code invoke()} method.
* @param rabbitTemplate the template - must not be null and must have a connection
* factory.
* @since 2.0
*/
public RabbitAdmin(RabbitTemplate rabbitTemplate) {
    Assert.notNull(rabbitTemplate, "RabbitTemplate must not be null");
    Assert.notNull(rabbitTemplate.getConnectionFactory(), "RabbitTemplate's ConnectionFactory must not be null");
    this.connectionFactory = rabbitTemplate.getConnectionFactory();
    this.rabbitTemplate = rabbitTemplate;
}

但實際看他們的建構函式,發現如果我們不需要自己定製RabbitTemplate,直接使用第一個構造方法即可。

@Configuration
public class RabbitMqConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(rabbitTemplate);
    }
}

類似這樣,就配置好了。

接下來寫一個測試類,測試宣告交換機,佇列,以及傳送訊息和接收訊息等操作。

首先是宣告交換機型別,四種交換機對應的構造方法如下

//引數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最後一個與它繫結的佇列刪除時,是否自動刪除該交換機】
TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
DirectExchange directExchange = new DirectExchange("default.direct", true, false);
FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareExchange(headersExchange);

然後是宣告佇列

//1.佇列名稱,2.宣告一個持久佇列,3.宣告一個獨立佇列,4.是否自動刪除佇列
Queue queue1 = new Queue("queue1", true, false, false);
Queue queue2 = new Queue("queue2", true, false, false);
Queue queue3 = new Queue("queue3", true, false, false);
Queue queue4 = new Queue("queue4", true, false, false);
rabbitAdmin.declareQueue(queue1);
rabbitAdmin.declareQueue(queue2);
rabbitAdmin.declareQueue(queue3);
rabbitAdmin.declareQueue(queue4);

然後把佇列和交換機相互繫結

//1.queue:繫結的佇列,2.topicExchange:繫結到那個交換器,3.test.send.topic:繫結的路由名稱[routing key]
rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));

因為fanout型別的交換機忽略routing key屬性,所以不需要設定。

完整測試程式碼如下

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitMqTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testDeclare() {
        //引數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最後一個與它繫結的佇列刪除時,是否自動刪除該交換機】
        TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
        DirectExchange directExchange = new DirectExchange("default.direct", true, false);
        FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
        HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
        rabbitAdmin.declareExchange(topicExchange);
        rabbitAdmin.declareExchange(directExchange);
        rabbitAdmin.declareExchange(fanoutExchange);
        rabbitAdmin.declareExchange(headersExchange);

        //1.佇列名稱,2.宣告一個持久佇列,3.宣告一個獨立佇列,4.是否自動刪除佇列
        Queue queue1 = new Queue("queue1", true, false, false);
        Queue queue2 = new Queue("queue2", true, false, false);
        Queue queue3 = new Queue("queue3", true, false, false);
        Queue queue4 = new Queue("queue4", true, false, false);
        rabbitAdmin.declareQueue(queue1);
        rabbitAdmin.declareQueue(queue2);
        rabbitAdmin.declareQueue(queue3);
        rabbitAdmin.declareQueue(queue4);

        //1.queue:繫結的佇列,2.topicExchange:繫結到那個交換器,3.test.send.topic:繫結的路由名稱[routing key]
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));
    }
}

執行結果如下:

再看一下繫結情況:

Direct交換機

Fanout交換機

Topic交換機

全都測試成功,接下來就可以開始傳送訊息了。

傳送訊息有多個API可用,這裡選擇高亮的那個API,實際還有send方法可用,不過需要自己來構建訊息

@Test
public void testSendMessage() {
    //1.交換機,2.路由鍵,3.傳送的訊息體【這裡的訊息體會自動轉換為訊息,也可以自己構建訊息物件】
    rabbitTemplate.convertAndSend("default.topic","mq.whatever.this.is",new Student(1,"mmp","male",234));
}

測試結果如下:

一定要注意topic型別的交換機的路由鍵的匹配規則,#匹配0個或者多個單詞,*匹配一個單詞

那如果不想每次都是在測試類裡面建立交換機和佇列,可以怎麼做呢?可以在程式入口類裡面實現CommandLineRunner介面,程式碼如下,這樣的話,每次啟動都會宣告一次,當然重複宣告不會報錯,但會覆蓋之前的宣告,比如說之前宣告的時候定義的routing key可能就會被覆蓋。

@SpringBootApplication
@EnableRabbit
public class AmqpApplication implements CommandLineRunner {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    public static void main(String[] args) {
        SpringApplication.run(AmqpApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        //引數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最後一個與它繫結的佇列刪除時,是否自動刪除該交換機】
        TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
        DirectExchange directExchange = new DirectExchange("default.direct", true, false);
        FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
        HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
        rabbitAdmin.declareExchange(topicExchange);
        rabbitAdmin.declareExchange(directExchange);
        rabbitAdmin.declareExchange(fanoutExchange);
        rabbitAdmin.declareExchange(headersExchange);

        //1.佇列名稱,2.宣告一個持久佇列,3.宣告一個獨立佇列,4.是否自動刪除佇列
        Queue queue1 = new Queue("queue1", true, false, false);
        Queue queue2 = new Queue("queue2", true, false, false);
        Queue queue3 = new Queue("queue3", true, false, false);
        Queue queue4 = new Queue("queue4", true, false, false);
        rabbitAdmin.declareQueue(queue1);
        rabbitAdmin.declareQueue(queue2);
        rabbitAdmin.declareQueue(queue3);
        rabbitAdmin.declareQueue(queue4);

        //1.queue:繫結的佇列,2.topicExchange:繫結到那個交換器,3.test.send.topic:繫結的路由名稱[routing key]
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));
    }
}

但其實這樣做還是比較複雜的,而且完全沒有必要,更加簡單方便的做法是,把那些配置宣告的物件直接新增到IOC容器中,讓spring自動的去呼叫相應的宣告方法,真是縱享絲滑呀,類似下面這樣子:

@Bean
public Queue Queue() {
    return new Queue("hello");
}

繼續測試接收訊息,有一個註解很方便。

@Service
public class ReceiverService {
    @RabbitListener(queues = {"queue3"})
    public void receive(Student student) {
        System.out.println("接收到訊息並列印:"+student);
    }
}

測試結果如下:

接收到訊息並列印:student{id=1, name='mmp', gender='male', age=234}

也可以直接使用方法接收訊息

@Test
public void testReceive() {
    Student student = (Student) rabbitTemplate.receiveAndConvert("queue3");
    System.out.println(student);
}

測試結果如下:

student{id=1, name='mmp', gender='male', age=234}

如果想讓傳送的學生物件使用JSON格式怎麼辦呢?

需要定製一下:

@Configuration
public class RabbitMqConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
        return new RabbitAdmin(rabbitTemplate);
    }
}

測試一下:

原始碼地址:https://github.com/lingEric/springboot-integration-hello

更多官方tutorials請移步https://github.com/rabbitmq/rabbitmq-tutori