Spring AMQP 源碼分析 01 - Impatient
阿新 • • 發佈:2017-06-20
消息格式 部分 細節 pub core declare 成功 req 自動刪除
gordon.study.rabbitmq.springamqp.Impatient.java
org.springframework.amqp.rabbit.connection.CachingConnectionFactory 是 ConnectionFactory 的一個實現類。它的屬性 com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory 引用了一個真正的 RabbitMQ 連接工廠,一般會在構造函數中被創建出來。後面所有的實際連接都是通過這個 rabbitConnectionFactory 創建的,CachingConnectionFactory 只是管理這些連接。
不同於官網示例代碼,第16行定義了 URI,表明通過 guest 用戶訪問 localhost:5672 的 RabbitMQ 服務。原因是我在本機運行時通過 Spring AMQP 自動解析出來的 host 是機器名而不是 localhost,導致 guest 用戶沒有權限連接,會拋出異常,因此我需要主動將 host 的值設置為 localhost。除了示例代碼中用到的通過 URI 指定的方法外,還有以下幾種方法:
1. 通過可以指定 hostname 的構造函數。適用面窄,不能設置用戶名等其它連接工廠屬性
ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
2. 自己創建 com.rabbitmq.client.ConnectionFactory
com.rabbitmq.client.ConnectionFactory rabbitConnFactory = new com.rabbitmq.client.ConnectionFactory();
rabbitConnFactory.setHost("localhost");
rabbitConnFactory.setAutomaticRecoveryEnabled(false);
ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnFactory);
3. 通過 AbstractConnectionFactory 獲取到內部 rabbitConnectionFactory
AbstractConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.getRabbitConnectionFactory().setHost("localhost");
4. 通過 AbstractConnectionFactory 封裝的方法間接操作 rabbitConnectionFactory
AbstractConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
### 準備
## 目標
了解 Spring AMQP 核心代碼## 前置知識
RabbitMQ 入門## 相關資源
Quick Tour for the impatient:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_introduction.html#quick-tour> Sample code:<https://github.com/gordonklg/study>,rabbitmq module## 測試代碼
### 分析
## ConnectionFactory分析
org.springframework.amqp.rabbit.connection.ConnectionFactory 是 Spring AMQP 定義的連接工廠接口,負責創建連接。註意 RabbitMQ client 也有一個同名的連接工廠,這兒用的是 Spring AMQP 定義的。org.springframework.amqp.rabbit.connection.CachingConnectionFactory
## AmqpAdmin分析
org.springframework.amqp.core.AmqpAdmin 接口定義了 AMQP 基礎管理操作,主要是對各種資源(交換機、隊列、綁定)的申明和刪除操作。 org.springframework.amqp.rabbit.core.RabbitAdmin 實現了 AmqpAdmin 接口。通過構造函數傳入前面創建的 ConnectionFactory 實例,設置到 RabbitAdmin 的 ConnectionFactory connectionFactory 屬性中,同時還在構造函數中創建了一個 RabbitTemplate 實例,設置到 RabbitTemplate rabbitTemplate 屬性中。 declareQueue 方法用來申明隊列。org.springframework.amqp.core.Queue 是 Spring AMQP 對隊列的封裝,其屬性與 RabbitMQ Java client 中定義的 Queue 的屬性基本一致,new Queue("spring"); 相當於 RabbitMQ Java client 中 channel.queueDeclare("spring", true, false, false, null); 指定的隊列特性,即隊列是持久化、非排他性、非自動刪除的。 declareQueue 調用 rabbitTemplate 的 execute(ChannelCallback) 方法,在 ChannelCallback 的回調方法 doInRabbit(com.rabbitmq.client.Channel) 中通過入參 channel 調用 RabbitMQ Java client 提供的 channel.queueDeclare 方法申明隊列。代碼中有個細節:所有 "amq." 開頭的隊列會被 Spring AMQP 框架忽略,不會觸發 channel.queueDeclare 方法調用。RabbitTemplate 細節下文分析。## AmqpTemplate分析
org.springframework.amqp.core.AmqpTemplate 接口定義了 AMQP 基礎操作,主要為同步的消息收發方法。 org.springframework.amqp.rabbit.core.RabbitTemplate 實現了 AmqpTemplate 接口。類似於 RabbitAdmin,RabbitTemplate 也需要引用外部的 ConnectionFactory 用於創建連接。 顧名思義,RabbitTemplate 就是 RabbitMQ 收發消息的模板方法,類似於 JdbcTemplate 的設計。所以,RabbitTemplate 要實現創建連接、獲取信道、收發消息(等實際操作)、消息格式轉換、關閉信道與連接等模板代碼。 例如,對於 convertAndSend(String routingKey, Object message)方法,首先通過 convertMessageIfNecessary 方法將 Object message 轉化為 org.springframework.amqp.core.Message 實例。Message 類是 Spring AMQP 對消息的封裝。convertMessageIfNecessary 方法通過獲取在 RabbitTemplate 構造函數中創建的 org.springframework.amqp.support.converter.SimpleMessageConverter,調用其 toMessage 方法完成 Message 實例的創建(即消息轉化)。 接下來,借助 execute(ChannelCallback action, ConnectionFactory connectionFactory) 方法,在 ChannelCallback 接口的匿名實現類的 doInRabbit(Channel) 方法中實現發送消息功能,代碼截圖如下: execute(ChannelCallback action, ConnectionFactory connection) 方法與 ChannelCallback 接口合作,成功地將消息發送(等實際操作)與獲取連接和信道這兩部分代碼隔離開。execute 方法中通過傳入的 ConnectionFactory 獲取連接和信道,ChannelCallback 接口的 doInRabbit(Channel channel) 方法作為回調函數,通過 channel 參數接受 execute 方法中獲取的信道,完成消息發送的具體業務。代碼實現很簡單,在 execute 方法獲取到信道 channel 後,調用 action.doInRabbit(channel); 即可。 調用棧信息: doSend 方法實現消息發送這個具體操作。本質是通過調用 RabbitMQ Java client 提供的 channel.basicPublish 方法發送消息。 業務操作完成後,execute 方法會回收連接和信道資源,整個消息發送模板功能完成。 receiveAndConvert 方法實現思路與 convertAndSend 基本一致,調用棧如下:Spring AMQP 源碼分析 01 - Impatient