訊息中介軟體RabbitMQ
1 RabbitMQ簡介
1.1訊息佇列中介軟體簡介
訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題實現高效能,高可用,可伸縮和最終一致性[架構] 使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
以下介紹訊息佇列在實際應用中常用的使用場景:非同步處理,應用解耦,流量削鋒和訊息通訊四個場景
1.2什麼是RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。
1.可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。
2.靈活的路由(Flexible Routing)
在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。
3.訊息叢集(Clustering)
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。
5.多種協議(Multi-protocol)
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。
6.多語言客戶端(Many Clients)
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。
7.管理介面(Management UI)
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。
8.跟蹤機制(Tracing)
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。
1.3架構圖與主要概念
1.3.1架構圖
1.3.2主要概念
RabbitMQ Server:也叫broker server,它是一種傳輸服務。 他的角色就是維護一條從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。
Producer:訊息生產者,如圖A、B、C,資料的傳送方。訊息生產者連線RabbitMQ伺服器然後將訊息投遞到Exchange。
Consumer:訊息消費者,如圖1、2、3,資料的接收方。訊息消費者訂閱佇列,RabbitMQ將Queue中的訊息傳送到訊息消費者。
Exchange:生產者將訊息傳送到Exchange(交換器),由Exchange將訊息路由到一個或多個Queue中(或者丟棄)。Exchange並不儲存訊息。RabbitMQ中的Exchange有direct、fanout、topic、headers四種類型,每種型別對應不同的路由規則。
Queue:(佇列)是RabbitMQ的內部物件,用於儲存訊息。訊息消費者就是通過訂閱佇列來獲取訊息的,RabbitMQ中的訊息都只能儲存在Queue中,生產者生產訊息並最終投遞到Queue中,消費者可以從Queue中獲取訊息並消費。多個消費者可以訂閱同一個
Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。
RoutingKey:生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時一
般這些內容都是固定配置好的),我們的生產者就可以在傳送訊息給Exchange時,通過指定routing key來決定訊息流向哪裡。RabbitMQ為routing key設定的長度限制為255bytes。
Connection:(連線):Producer和Consumer都是通過TCP連線到RabbitMQ Server的。以後我們可以看到,程式的起始處就是建立這個TCP連線。
Channels:(通道):它建立在上述的TCP連線中。資料流動都是在Channel中進行的。也就是說,一般情況是程式起始建立TCP連線,第二步就是建立這個Channel。
VirtualHost:許可權控制的基本單位,一個VirtualHost裡面有若干Exchange和MessageQueue,以及指定被哪些user使用
2 走進RabbitMQ
2.1 RabbitMQ安裝與啟動
2.1.1 windows環境下的安裝
(1)下載並安裝Eralng
(2)下載並安裝rabbitmq
注意不要安裝在包含中文和空格的目錄下!安裝後window服務中就存在rabbitMQ了,並且是啟動狀態。
(3)安裝管理介面(外掛)
進入rabbitMQ安裝目錄的sbin目錄,輸入命令
rabbitmq‐plugins enable rabbitmq_management
(4)重新啟動服務
(5)開啟瀏覽器,位址列輸入http://127.0.0.1:15672 ,即可看到管理介面的登陸頁
輸入使用者名稱和密碼,都為guest 進入主介面:
最上側的導航以此是:概覽、連線、通道、交換器、佇列、使用者管理
2.1.2 docker環境下的安裝
(1)下載映象:
docker pull rabbitmq:management
(2)建立容器,rabbitmq需要有對映以下埠: 5671 5672 4369 15671 1567225672
- 15672 (if management plugin is enabled)
- 15671 management監聽埠
- 5672, 5671 (AMQP 0-9-1 without and with TLS)
- 4369 (epmd) epmd 代表 Erlang 埠對映守護程序
- 25672 (Erlang distribution)
docker run ‐di ‐‐name=tensquare_rabbitmq ‐p 5671:5617 ‐p 5672:5672 ‐p 4369:4369 ‐p 15671:15671 ‐p 15672:15672 ‐p 25672:25672 rabbitmq:management
瀏覽器訪問 http://192.168.184.134:15672/#/
2.2 直接模式(Direct)
2.2.1 什麼是Direct模式
我們需要將訊息發給唯一一個節點時使用這種模式,這是最簡單的一種形式。
任何傳送到Direct Exchange的訊息都會被轉發到RouteKey中指定的Queue。
1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字串,下文稱其為default Exchange)。
2.這種模式下不需要將Exchange進行任何繫結(binding)操作
3.訊息傳遞時需要一個“RouteKey”,可以簡單的理解為要傳送到的佇列名字。
4.如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。
2.2.2 建立佇列
做下面的例子前,我們先建立一個叫itcast的佇列。
Durability:是否做持久化 Durable(持久) transient(臨時)Auto delete : 是否自動刪除
2.2.3 程式碼實現-訊息生產者
(1)建立工程rabbitmq_demo,引入amqp起步依賴 ,pom.xml如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐parent</artifactId> <version>2.0.1.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF‐8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF‐8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring‐boot‐starter‐test</artifactId> <scope>test</scope> </dependency> </dependencies>
(2)編寫配置檔案application.yml
spring:
rabbitmq:
host:192.168.184.134
(3)編寫啟動類
@SpringBootApplication publicclassApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(Application.class); } }
(4)編寫測試類
@RunWith(SpringRunner.class) @SpringBootTest(classes=Application.class) publicclassMqTest{ @Autowired privateRabbitTemplaterabbitTemplate; @Test publicvoidtestSend(){ rabbitTemplate.convertAndSend("itcast","我要紅包"); } }
2.2.4 程式碼實現-訊息消費者
(1)編寫訊息消費者類
@Component @RabbitListener(queues="itcast") publicclassCustomer1{ @RabbitHandler publicvoidshowMessage(Stringmessage){ System.out.println("itcast接收到訊息:"+message); } }
(2)執行啟動類,可以在控制檯看到剛才傳送的訊息
2.3 分列模式(Fanout)
2.3.1 什麼是分列(Fanout)模式
當我們需要將訊息一次發給多個佇列時,需要使用這種模式。如下圖:
任何傳送到Fanout Exchange的訊息都會被轉發到與該Exchange繫結(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行繫結,一個Exchange可以繫結多個Queue,一個Queue可以同多個Exchange進行繫結。
4.如果接受到訊息的Exchange沒有與任何Queue繫結,則訊息會被拋棄。
2.3.2 交換器繫結佇列
(1)在queue中新增佇列itheima 和kudingyu
(2)新建交換器chuanzhi
(3)將itcast 和itheima兩個佇列繫結到交換器chuanzhi
點選chuanzhi進入交換器管理介面
點選Bindings新增繫結 itheima和kudingyu
繫結後效果如下:
2.3.3 程式碼實現-訊息生產者
@Test publicvoidtestSendFanout(){ rabbitTemplate.convertAndSend("chuanzhi","","分列模式走起"); }
2.3.4 程式碼實現-訊息消費者
建立訊息監聽類,用於監聽itheima的訊息
@Component @RabbitListener(queues="itheima") publicclassCustomer2{ @RabbitHandler publicvoidshowMessage(Stringmessage){ System.out.println("itheima接收到訊息:"+message); } }
建立訊息監聽類,用於監聽kudingyu的訊息
@Component @RabbitListener(queues="kudingyu") publicclassCustomer3{ @RabbitHandler publicvoidshowMessage(Stringmessage){ System.out.println("kudingyu接收到訊息:"+message); } }
2.4 主題模式(Topic)
2.4.1 什麼是主題模式
任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的Queue上
如上圖所示,此類交換器使得來自不同的源頭的訊息可以到達一個對列,其實說的更明白一點就是模糊匹配的意思,例如:上圖中紅色對列的routekey為usa.#,#代表匹配任意字元,但是要想訊息能到達此對列,usa.必須匹配後面的#好可以隨意。圖中usa.news
usa.weather,都能找到紅色佇列,符號 # 匹配一個或多個詞,符號 * 匹配不多不少一個詞。因此 usa.# 能夠匹配到 usa.news.XXX ,但是 usa.* 只會匹配到 usa.XXX 。
注:
交換器說到底是一個名稱與佇列繫結的列表。當訊息釋出到交換器時,實際上是由你所連線的通道,將訊息路由鍵同交換器上繫結的列表進行比較,最後路由訊息。
任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的Queue上
1.這種模式較為複雜,簡單來說,就是每個佇列都有其關心的主題,所有的訊息都帶有一個“標題”(RouteKey),Exchange會將訊息轉發到所有關注主題能與RouteKey模糊匹配的佇列。
2.這種模式需要RouteKey,也許要提前繫結Exchange與Queue。
3.在進行繫結時,要提供一個該佇列關心的主題,如“#.log.#”表示該佇列關心所有涉及log的訊息(一個RouteKey為”MQ.log.error”的訊息會被轉發到該佇列)。
4.“#”表示0個或若干個關鍵字,“”表示一個關鍵字。如“log.”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此訊息
2.4.2 建立佇列與繫結
(1)新建一個交換器 ,型別選擇topic
(2)點選新建的交換器topictest
新增匹配規則,新增後列表如下:
2.4.3 程式碼實現
編寫測試類方法:
@Test publicvoidtestSendTopic1(){ rabbitTemplate.convertAndSend("topictest","goods.aaa","主題模式"); }
輸出結果:itcast接收到訊息:主題模式
@Test publicvoidtestSendTopic2(){ rabbitTemplate.convertAndSend("topictest","article.content.log","主題模式"); }
輸出結果:itheima接收到訊息:主題模式
@Test publicvoidtestSendTopic3(){ rabbitTemplate.convertAndSend("topictest","goods.log","主題模式"); }
輸出結果:
itheima接收到訊息:主題模式
itcast接收到訊息:主題模式
kudingyu接收到訊息:主題模式