1. 程式人生 > 實用技巧 >RabbitMQ筆記整理

RabbitMQ筆記整理

1 RabbitMQ簡介

1.1 訊息佇列中介軟體簡介

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合非同步訊息流量 削鋒等問題實現高效能,高可用,可伸縮和最終一致性[架構] 使用較多的訊息佇列有
ActiveMQRabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

訊息佇列在實際應用中常用的使用場景:非同步處理,應用解耦,流量削鋒和訊息通訊

  • 非同步處理

​ 比如註冊、需要處理業務並且還需要發簡訊、發郵件驗證

  • 應用解耦

    比如發簡訊何發郵件都是由專門的專案或者微服務來操作,比封裝成工具類耦合性要低

  • 流量削鋒

​ 比如現實生活中的漏斗、專案高峰期提高併發

1.2 什麼是RabbitMQ

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放
標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並
受產品、開發語言等條件的限制

RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件
性、高可用性等方面表現不俗。具體特點包括:

1.可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。

2.靈活的路由(Flexible Routing)

在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ
已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個
Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。

3.訊息叢集(Clustering)
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。

4.高可用(Highly Available Queues)
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。

5.多種協議(Multi-protocol)
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。

6.多語言客戶端(Many Clients)
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。

7.管理介面(Management UI)
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方
面。
8.跟蹤機制(Tracing)
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。

9.外掛機制(Plugin System)
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。

1.3 架構圖與主要概念

1.3.1 架構圖

1.3.2 主要概念

RabbitMQ Server: 也叫broker server,它是一種傳輸服務。 他的角色就是維護一條
從Producer到Consumer的路線,保證資料能夠按照指定的方式進行傳輸。

Producer: 訊息生產者,如圖A、B、C,資料的傳送方。訊息生產者連線RabbitMQ服
務器然後將訊息投遞到Exchange。

Consumer:訊息消費者,如圖1、2、3,資料的接收方。訊息消費者訂閱佇列,
RabbitMQ將Queue中的訊息傳送到訊息消費者。

Exchange:生產者將訊息傳送到Exchange(交換器),由Exchange將訊息路由到一個
或多個Queue中(或者丟棄)。Exchange並不儲存訊息。RabbitMQ中的Exchange有
direct、fanout、topic、headers四種類型,每種型別對應不同的路由規則。

Queue:(佇列)是RabbitMQ的內部物件,用於儲存訊息。訊息消費者就是通過訂閱
佇列來獲取訊息的,RabbitMQ中的訊息都只能儲存在Queue中,生產者生產訊息並最終
投遞到Queue中,消費者可以從Queue中獲取訊息並消費。多個消費者可以訂閱同一個
Queue,這時Queue中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者
都收到所有的訊息並處理。

RoutingKey:生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,
來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯
合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時一
般這些內容都是固定配置好的),我們的生產者就可以在傳送訊息給Exchange時,通過
指定routing key來決定訊息流向哪裡。RabbitMQ為routing key設定的長度限制為255
bytes。

Connection: (連線):Producer和Consumer都是通過TCP連線到RabbitMQ Server
的。以後我們可以看到,程式的起始處就是建立這個TCP連線。

Channels: (通道):它建立在上述的TCP連線中。資料流動都是在Channel中進行
的。也就是說,一般情況是程式起始建立TCP連線,第二步就是建立這個Channel。

VirtualHost:許可權控制的基本單位,一個VirtualHost裡面有若干Exchange和
MessageQueue,以及指定被哪些user使用

2 RabbitMQ安裝

2.1 windows安裝

2.2 docker安裝

安裝成功後訪問:ip:15672 看到下面頁面代表安裝成功

rabbitmq預設賬號:guest

預設密碼:guest

3 RabbitMQ模式

3.1 直接模式(direct)

3.1.1 概念

將訊息發給唯一一個節點時使用這種模式,這是最簡單的一種形式。

任何傳送到Direct Exchange的訊息都會被轉發到RouteKey中指定的Queue。
1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字串,下
文稱其為default Exchange)。
2.這種模式下不需要將Exchange進行任何繫結(binding)操作
3.訊息傳遞時需要一個“RouteKey”,可以簡單的理解為要傳送到的佇列名字。
4.如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。

3.1.2 程式碼實現

需求:將訊息發給q1佇列(預設就是直接模式)

首先在RabbitMQ中建立一個q1佇列

然後在rabbitmq中就可以看到剛剛建立的佇列

專案搭建

(1)建立工程rabbitMq,引入依賴:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/>
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

(2)編寫配置檔案application.yml

spring:
  rabbitmq:
    host: localhost

(3)編寫啟動類:

@SpringBootApplication
public class RabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQApplication.class,args);
    }
}

(4)編寫訊息生產者測試類

@SpringBootTest(classes = RabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class MqTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsg(){
        //第一個引數為佇列名稱,第二個引數為要傳送的訊息物件,這裡傳的是一個字串
        rabbitTemplate.convertAndSend("q1","hello rabbit");
    }
}

執行上面程式碼就可以向訊息佇列傳送一個訊息,可以看到rabbitmq中的訊息變成1了。

(5)編寫訊息消費者程式碼

@Component
@RabbitListener(queues = {"q1"})//從哪個佇列取訊息
public class Q1Listener {

    @RabbitHandler//取到訊息後的處理方法
    public void receiverMsg(String msg){
        System.out.println(msg);
    }
}

執行上面程式碼控制檯會列印訊息,並且佇列中的訊息被消費了。

3.2 分列模式(fanout)

3.2.1 概念

將訊息一次發給多個佇列時,需要使用這種模式。如下圖:

任何傳送到Fanout Exchange的訊息都會被轉發到與該Exchange繫結(Binding)的所有
Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行繫結,一個Exchange可以繫結多個
Queue,一個Queue可以同多個Exchange進行繫結。
4.如果接受到訊息的Exchange沒有與任何Queue繫結,則訊息會被拋棄。

3.2.2 程式碼實現

需求:將訊息發給q1和q2佇列

(1)在RabbitMQ中建立q2,q3佇列

(2)在RabbitMQ中建立路由

(3)在RabbitMQ中配置路由規則

(4)編寫生產者測試程式碼

@Test
public void sendMsgFanout(){
    //第一個引數為路由名稱,第三個引數為訊息物件
    rabbitTemplate.convertAndSend("q1&q2","","hello rabbit fanout");
}

(5)編寫消費者listener(q2,q3)

q2

@Component
@RabbitListener(queues = {"q2"})
public class Q2Listener {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("從q2中取訊息:"+msg);
    }
}

q3

@Component
@RabbitListener(queues = {"q3"})
public class Q3Listener {

    @RabbitHandler
    public void receiverMsg(String msg){
        System.out.println("從q3中取訊息:"+msg);
    }
}

3.3 主題模式(Topic)

3.3.1 概念

分列模式的升級版,可以給每個佇列指定key

任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的Queue

如上圖所示
此類交換器使得來自不同的源頭的訊息可以到達一個對列,其實說的更明白一點就是模
糊匹配的意思,例如:上圖中紅色對列的routekey為usa.#,#代表匹配任意字元,但是
要想訊息能到達此對列,usa.必須匹配後面的#好可以隨意。圖中usa.news
usa.weather,都能找到紅色佇列,符號# 匹配一個或多個詞,符號* 匹配不多不少一個
詞。因此usa.# 能夠匹配到usa.news.XXX ,但是usa.* 只會匹配到usa.XXX 。
注:
交換器說到底是一個名稱與佇列繫結的列表。當訊息釋出到交換器時,實際上是由你所
連線的通道,將訊息路由鍵同交換器上繫結的列表進行比較,最後路由訊息。
任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定話題的
Queue上
1.這種模式較為複雜,簡單來說,就是每個佇列都有其關心的主題,所有的訊息都帶有一
個“標題”(RouteKey),Exchange會將訊息轉發到所有關注主題能與RouteKey模糊匹配的
佇列。
2.這種模式需要RouteKey,也許要提前繫結Exchange與Queue。

3.在進行繫結時,要提供一個該佇列關心的主題,如“#.log.#”表示該佇列關心所有涉及
log的訊息(一個RouteKey為”MQ.log.error”的訊息會被轉發到該佇列)。

4.“#”表示0個或若干個關鍵字,“”表示一個關鍵字。如“log.”能與“log.warn”匹配,無法
與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此訊息

3.3.2 程式碼實現

(1)建立交換器

(2)配置路由key

說明:

  • 交換器會將訊息轉發到所有關注主題能與RouteKey模糊匹配的佇列,如果交換器沒有發現能夠與RouteKey匹配的佇列,則會拋棄此訊息。

  • #表示多個字元 *表示一個字元

(4)編寫生產者程式碼

@Test
public void sendMsgTopic(){
    /**
         * rabbitTemplate.convertAndSend(args1,args2,args3);
         * 第一個引數為交換器名稱
         * 第二個引數為路由匹配規則
         * 第三個引數為訊息物件
         */
    //只發送給q1
    //rabbitTemplate.convertAndSend("topic_ex","haha.abc"," rabbit fanout");
    //只發送給q2
    //rabbitTemplate.convertAndSend("topic_ex","abc.hehe"," rabbit fanout");
    //傳送給q1、q2、q3
    //rabbitTemplate.convertAndSend("topic_ex","haha.hehe"," rabbit fanout");
    //只發送給q1、q2
    rabbitTemplate.convertAndSend("topic_ex","haha.heihei.hehe","rabbit fanout");
}