1. 程式人生 > 實用技巧 >訊息中介軟體RabbitMQ

訊息中介軟體RabbitMQ

1 RabbitMQ簡介

1.1訊息佇列中介軟體簡介

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題實現高效能,高可用,可伸縮和最終一致性[架構] 使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
以下介紹訊息佇列在實際應用中常用的使用場景:非同步處理,應用解耦,流量削鋒和訊息通訊四個場景

1.2什麼是RabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。具體特點包括:
1.可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。
2.靈活的路由(Flexible Routing)

在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。
3.訊息叢集(Clustering)
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。

4.高可用(Highly Available Queues)
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。
5.多種協議(Multi-protocol)
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。
6.多語言客戶端(Many Clients)
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。
7.管理介面(Management UI)
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。
8.跟蹤機制(Tracing)
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。
9.外掛機制(Plugin System)
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。

1.3架構圖與主要概念

1.3.1架構圖

1.3.2主要概念

RabbitMQ Server:也叫broker server,它是一種傳輸服務。 他的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。
Producer:訊息生產者,如圖A、B、C,資料的傳送方。訊息生產者連線RabbitMQ伺服器然後將訊息投遞到Exchange。
Consumer:訊息消費者,如圖1、2、3,資料的接收方。訊息消費者訂閱佇列,RabbitMQ將Queue中的訊息傳送到訊息消費者。
Exchange:生產者將訊息傳送到Exchange(交換器),由Exchange將訊息路由到一個或多個Queue中(或者丟棄)。Exchange並不儲存訊息。RabbitMQ中的Exchange有direct、fanout、topic、headers四種類型,每種型別對應不同的路由規則。
Queue:(佇列)是RabbitMQ的內部物件,用於儲存訊息。訊息消費者就是通過訂閱佇列來獲取訊息的,RabbitMQ中的訊息都只能儲存在Queue中,生產者生產訊息並最終投遞到Queue中,消費者可以從Queue中獲取訊息並消費。多個消費者可以訂閱同一個
Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。
RoutingKey:生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時一
般這些內容都是固定配置好的),我們的生產者就可以在傳送訊息給Exchange時,通過指定routing key來決定訊息流向哪裡。RabbitMQ為routing key設定的長度限制為255bytes。

Connection:(連線):Producer和Consumer都是通過TCP連線到RabbitMQ Server的。以後我們可以看到,程式的起始處就是建立這個TCP連線。
Channels:(通道):它建立在上述的TCP連線中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連線,第二步就是建立這個Channel。
VirtualHost:許可權控制的基本單位,一個VirtualHost裡面有若干Exchange和MessageQueue,以及指定被哪些user使用

2 走進RabbitMQ

2.1 RabbitMQ安裝與啟動

2.1.1 windows環境下的安裝

(1)下載並安裝Eralng

(2)下載並安裝rabbitmq
注意不要安裝在包含中文和空格的目錄下!安裝後window服務中就存在rabbitMQ了,並且是啟動狀態。
(3)安裝管理介面(外掛)
進入rabbitMQ安裝目錄的sbin目錄,輸入命令

rabbitmq‐plugins enable rabbitmq_management

(4)重新啟動服務
(5)開啟瀏覽器,位址列輸入http://127.0.0.1:15672 ,即可看到管理介面的登陸頁

輸入使用者名稱和密碼,都為guest 進入主介面:

最上側的導航以此是:概覽、連線、通道、交換器、佇列、使用者管理


2.1.2 docker環境下的安裝

(1)下載映象:

docker pull rabbitmq:management

(2)建立容器,rabbitmq需要有對映以下埠: 5671 5672 4369 15671 1567225672

  • 15672 (if management plugin is enabled)
  • 15671 management監聽埠
  • 5672, 5671 (AMQP 0-9-1 without and with TLS)
  • 4369 (epmd) epmd 代表 Erlang 埠對映守護程序
  • 25672 (Erlang distribution)
docker run ‐di ‐‐name=tensquare_rabbitmq ‐p 5671:5617 ‐p 5672:5672 ‐p 4369:4369 ‐p 15671:15671 ‐p 15672:15672 ‐p 25672:25672 rabbitmq:management

瀏覽器訪問 http://192.168.184.134:15672/#/

2.2 直接模式(Direct)

2.2.1 什麼是Direct模式

我們需要將訊息發給唯一一個節點時使用這種模式,這是最簡單的一種形式。

任何傳送到Direct Exchange的訊息都會被轉發到RouteKey中指定的Queue。
1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字串,下文稱其為default Exchange)。
2.這種模式下不需要將Exchange進行任何繫結(binding)操作
3.訊息傳遞時需要一個“RouteKey”,可以簡單的理解為要傳送到的佇列名字。
4.如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。

2.2.2 建立佇列

做下面的例子前,我們先建立一個叫itcast的佇列。

Durability:是否做持久化 Durable(持久) transient(臨時)Auto delete : 是否自動刪除


2.2.3 程式碼實現-訊息生產者

(1)建立工程rabbitmq_demo,引入amqp起步依賴 ,pom.xml如下:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring‐boot‐starter‐parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath />
    </parent>
    
    <properties>
        <project.build.sourceEncoding>UTF‐8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF‐8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring‐boot‐starter‐amqp</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring‐boot‐starter‐test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

(2)編寫配置檔案application.yml

spring:
rabbitmq:
host:192.168.184.134

(3)編寫啟動類

@SpringBootApplication
publicclassApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(Application.class);
}
}

(4)編寫測試類

@RunWith(SpringRunner.class)
@SpringBootTest(classes=Application.class)
publicclassMqTest{
@Autowired
privateRabbitTemplaterabbitTemplate;
@Test
publicvoidtestSend(){
rabbitTemplate.convertAndSend("itcast","我要紅包");
}
}

2.2.4 程式碼實現-訊息消費者

(1)編寫訊息消費者類

@Component
@RabbitListener(queues="itcast")
publicclassCustomer1{
@RabbitHandler
publicvoidshowMessage(Stringmessage){
System.out.println("itcast接收到訊息:"+message);
}
}

(2)執行啟動類,可以在控制檯看到剛才傳送的訊息

2.3 分列模式(Fanout)

2.3.1 什麼是分列(Fanout)模式

當我們需要將訊息一次發給多個佇列時,需要使用這種模式。如下圖:

任何傳送到Fanout Exchange的訊息都會被轉發到與該Exchange繫結(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行繫結,一個Exchange可以繫結多個Queue,一個Queue可以同多個Exchange進行繫結。
4.如果接受到訊息的Exchange沒有與任何Queue繫結,則訊息會被拋棄。


2.3.2 交換器繫結佇列

(1)在queue中新增佇列itheima 和kudingyu
(2)新建交換器chuanzhi

(3)將itcast 和itheima兩個佇列繫結到交換器chuanzhi

點選chuanzhi進入交換器管理介面

點選Bindings新增繫結 itheima和kudingyu

繫結後效果如下:

2.3.3 程式碼實現-訊息生產者

@Test 
publicvoidtestSendFanout(){ 
    rabbitTemplate.convertAndSend("chuanzhi","","分列模式走起"); 
}

2.3.4 程式碼實現-訊息消費者


建立訊息監聽類,用於監聽itheima的訊息

@Component
@RabbitListener(queues="itheima")
publicclassCustomer2{
@RabbitHandler
publicvoidshowMessage(Stringmessage){
System.out.println("itheima接收到訊息:"+message);
}
}

建立訊息監聽類,用於監聽kudingyu的訊息

@Component
@RabbitListener(queues="kudingyu")
publicclassCustomer3{
@RabbitHandler
publicvoidshowMessage(Stringmessage){
System.out.println("kudingyu接收到訊息:"+message);
}
}

2.4 主題模式(Topic)

2.4.1 什麼是主題模式

任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的Queue上

如上圖所示,此類交換器使得來自不同的源頭的訊息可以到達一個對列,其實說的更明白一點就是模糊匹配的意思,例如:上圖中紅色對列的routekey為usa.#,#代表匹配任意字元,但是要想訊息能到達此對列,usa.必須匹配後面的#好可以隨意。圖中usa.news
usa.weather,都能找到紅色佇列,符號 # 匹配一個或多個詞,符號 * 匹配不多不少一個詞。因此 usa.# 能夠匹配到 usa.news.XXX ,但是 usa.* 只會匹配到 usa.XXX 。
注:
交換器說到底是一個名稱與佇列繫結的列表。當訊息釋出到交換器時,實際上是由你所連線的通道,將訊息路由鍵同交換器上繫結的列表進行比較,最後路由訊息。
任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的Queue上

1.這種模式較為複雜,簡單來說,就是每個佇列都有其關心的主題,所有的訊息都帶有一個“標題”(RouteKey),Exchange會將訊息轉發到所有關注主題能與RouteKey模糊匹配的佇列。

2.這種模式需要RouteKey,也許要提前繫結Exchange與Queue。

3.在進行繫結時,要提供一個該佇列關心的主題,如“#.log.#”表示該佇列關心所有涉及log的訊息(一個RouteKey為”MQ.log.error”的訊息會被轉發到該佇列)。

4.“#”表示0個或若干個關鍵字,“”表示一個關鍵字。如“log.”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。

5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此訊息


2.4.2 建立佇列與繫結

(1)新建一個交換器 ,型別選擇topic

(2)點選新建的交換器topictest

新增匹配規則,新增後列表如下:

2.4.3 程式碼實現

編寫測試類方法:

@Test
publicvoidtestSendTopic1(){
rabbitTemplate.convertAndSend("topictest","goods.aaa","主題模式");
}

輸出結果:itcast接收到訊息:主題模式

@Test
publicvoidtestSendTopic2(){

    rabbitTemplate.convertAndSend("topictest","article.content.log","主題模式");
}

輸出結果:itheima接收到訊息:主題模式

@Test
publicvoidtestSendTopic3(){
rabbitTemplate.convertAndSend("topictest","goods.log","主題模式");
}

輸出結果:

itheima接收到訊息:主題模式
itcast接收到訊息:主題模式
kudingyu接收到訊息:主題模式