SpringBoot利用Docker整合RabbitMQ
SpringBoot利用Docker整合RabbitMQ
一、RabbitMQ介紹
大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力。
訊息服務中兩個重要概念:訊息代理(message broker)和目的地(destination);
當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
訊息佇列主要有兩種形式的目的地,即:
- 佇列(queue):點對點訊息通訊(point-to-point)
- 主題(topic):釋出(publish)/訂閱(subscribe)訊息通訊
非同步處理圖:
點對點式:
- 訊息傳送者傳送訊息,訊息代理將其放入一個佇列中,訊息接收者從佇列中獲取訊息內容,訊息讀取後被移出佇列
- 訊息只有唯一的傳送者和接受者,但並不是說只能有一個接收者,即可以有多個接收者,但只要一個接收者可以接收到訊息
釋出訂閱式:
傳送者(釋出者)傳送訊息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼就會在訊息到達時同時收到訊息
常見的有兩種訊息代理,即JMS和AMQP
JMS基於JVM訊息代理的規範,ActiveMQ、HornetMQ是JMS實現
AMQP是高階訊息佇列協議,也是一個訊息代理的規範,相容JMS,RabbitMQ是AMQP的實現
JMS與AMQP的比較
Spring支援
- spring-jms提供了對JMS的支援
- spring-rabbit提供了對AMQP的支援
- 需要ConnectionFactory的實現來連線訊息代理
- 提供JmsTemplate、RabbitTemplate來發送訊息
- @JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽訊息代理髮布的訊息
- @EnableJms、@EnableRabbit開啟支援
SpringBoot自動配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
RabbitMQ中的核心概念
Message:訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
Publisher:訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
Exchange:交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別。
Queue:訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
Binding:繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。
Exchange 和Queue的繫結可以是多對多的關係,一個交換器可以繫結多個佇列,一個佇列也可以被多個交換器繫結。
Connection:網路連線,比如一個TCP連線。
Channel:通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬
TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
Virtual Host:虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個vhost 本質上就是一個迷你 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。一個訊息佇列伺服器上可以有多個虛擬主機,一個虛擬主機中也可以有多個交換器。
Broker:表示訊息佇列伺服器實體
關係圖:
RabbitMQ執行機制
AMQP 中訊息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把訊息釋出到 Exchange 上,訊息最終到達佇列並被消費者接收,而 Binding 決定交換器的訊息應該傳送到那個佇列。
- Exchange型別:
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers ,headers 型別現在幾乎不使用。
- direct型別:點對點型別,訊息中的路由鍵(routingkey)如果和 Binding中的 bindingkey 完全一致,交換器就將訊息發到對應的佇列中。
- fanout型別:廣播型別,每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。fanout 型別轉發訊息是最快的。
- topic型別:訊息訂閱型別,topic交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號“#”和符號“*”,井號匹配0個或多個單詞,星號匹配一個單詞。
二、Docker操作
docker是安裝在虛擬機器中的CentenOS中的,docker的安裝可以參考菜鳥教程。
啟動docker
systemctl start docker
用docker拉取rabbitMQ映象
# 之所以選擇3-management,是因為management版本的帶有web介面 [root@localhost ~]docker pull rabbitmq:3-management REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq 3-management df80af9ca0c9 4 weeks ago 149MB
檢視拉取下來的rabbitMQ映象
[root@localhost ~]docker images
建立並執行rabbitMQ容器
# -d是後臺啟動;-p是埠對映,第一個埠是linux伺服器的埠,第二個埠是docker容器的埠,將linux伺服器的埠與建立的rabbitmq容器的埠進行對映,5672是rabbitmq預設的埠,15672是rabbitmq web服務的埠;--name是給容器取名字;df80af9ca0c9是rabbitmq映象的id [[email protected] ~]docker run -d -p 5672:5672 -p 15672:15672 --name myRabbitMQ df80af9ca0c9
檢視docker中正在執行中的容器
[root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2082efdf44c0 df80af9ca0c9 "docker-entrypoint.s…" 5 minutes ago Up 5 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672-> 5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myRabbitMQ
在window中開啟瀏覽器輸入http://192.168.0.156:15672訪問,192.168.0.156是linux伺服器的ip,可以使用ip addr命令檢視,輸入使用者名稱:guest,密碼:guest,就可以進入rabbitmq的web管理介面,如圖所示:
三、Spring Boot整合RabbitMQ
1. 通過web介面向RabbitMQ中加入交換器和佇列,以及交換器與佇列之間的繫結規則
1)、新增交換器
點選Exchanges進入交換器管理頁面,新增交換器
輸入交換器的名字name,交換器的型別type,是否持久化預設是durable持久化(下次執行RabbitMQ時資料還存在),點選add exchnge新增,分別新增direct(單播)、fanout(廣播)、topic(訂閱)三種類型的交換器進行測試
2)、新增訊息佇列
點選Queues進入訊息佇列管理頁面,新增訊息佇列
輸入訊息佇列的名字name,是否持久化預設是durable持久化,點選add exchnge新增
3)、將交換器與訊息佇列繫結
點選Exchanges進入交換器管理頁面,點選名字為exchange.direct的交換器,對該交換器進行管理
my01.queues是訊息佇列的名字,Routing key是路由鍵,點選Bind按鈕進行繫結操作
4)、建立SpringBoot專案,引入RabbitMQ依賴
- 在application.properties配置檔案中加入RabbitMQ的配置
# rabbitmq配置,註釋的配置都是必須配置的,但我們使用預設值 spring.rabbitmq.host=192.168.0.114 #spring.rabbitmq.username=guest #spring.rabbitmq.password=guest #spring.rabbitmq.port=5672 #spring.rabbitmq.virtual-host=/
測試
@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendMessage04Test() { Map<String, Object> map = new HashMap<>(); map.put("message", "這是我們的第一個訊息"); map.put("data", Arrays.asList("hello world", 123, "china")); /** * 單播,direct,其他模式也是一樣的 * 可以通過convertAndSend傳送訊息 * 第一個引數是交換器的名字; * 第二個引數是路由鍵; * 第三個引數是訊息物件,訊息物件預設使用javajdk自帶的序列化方式進行序列化,我們可以寫自己的MessageConverter,讓訊息序列化成json串 */ rabbitTemplate.convertAndSend("exchange.direct", "my01.queues", map); } }
檢視RabbitMQ中的訊息
訊息物件預設使用javajdk自帶的序列化方式進行序列化,我們可以寫自己的MessageConverter,讓訊息序列化成json串
序列化訊息實體
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabiitConfig { /** * 將訊息序列化成json串 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
序列化測試
重新執行測試類,不需要修改程式碼
2.通過程式建立交換器和佇列,以及交換器與佇列之間的繫結規則
@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests { @Autowired private RabbitAdmin rabbitAdmin; @Test public void create() { // 建立一個交換器 rabbitAdmin.declareExchange(new TopicExchange("hsg.exchange.topic")); // 建立一個訊息佇列 rabbitAdmin.declareQueue(new Queue("hsg.queue")); /** * 建立繫結 * 第一個引數:目的地,可以是繫結的訊息佇列的名字 * 第二個引數:目的地型別,QUEUE、EXCHANGE * 第三個引數:交換器的名字 * 第四個引數:路由鍵,傳送訊息時的路由鍵與繫結時的路由鍵匹配,從而決定將訊息傳送給哪些訊息佇列 * 第五個引數:arguments的Map */ rabbitAdmin.declareBinding(new Binding("hsg.queue", Binding.DestinationType.QUEUE, "hsg.exchange.topic", "hsg.*", null)); } }
3.監聽訊息佇列中的訊息
開啟RabbitMQ,在啟動類上面加上@EnableRabbit註解
@SpringBootApplication @EnableRabbit public class SpringBootAmqpApplication {
監聽指定的訊息佇列,在監聽方法上面加上@RabbitListener註解
@Service public class BookService { private final static Logger log = LoggerFactory.getLogger(BookService.class); /** * queues是訊息佇列的名字,是一個數組,可以監聽多個訊息佇列 * 引數是訊息的型別 * @param bookEntity */ @RabbitListener(queues={"my01.queues"}) public void receive(BookEntity bookEntity) { log.info("接收到的訊息:{}", bookEntity); } }
測試
向與訊息佇列名為“my01.queues”繫結的交換器中傳送訊息,訊息的路由鍵要能匹配到”my01.queues”訊息佇列,當訊息到達”my01.queues”訊息佇列中時,監聽”my01.queues”訊息佇列的方法就會獲取到訊息並執行方法