通過例項理解 RabbitMQ 的基本概念
先說下自己開發的例項。
最近在使用 Spring Cloud Config 做分散式配置中心(基於 SVN/Git),當所有服務啟動後,SVN/Git 中的配置檔案更改後,客戶端服務讀取的還是舊的配置,並不能實時讀取(配置資訊會快取在客戶端),Spring Boot 提供了一種方式進行更新(通過spring-boot-starter-actuator
監控模組),然後 Post 訪問客戶端服務的/refresh
介面(也可以命令執行curl -X POST http://worker2:8115/refresh
),這樣客戶端會重新從配置中心獲取新的配置資訊,請求命令可以寫在 Git 的 Webhooks 指令碼中(修改提交 Push 後執行)。
如果客戶端服務比較少的話,這樣的解決方式沒問題,如果客戶端服務多的話,執行的請求指令碼就會非常多,而且單個服務的解決方式,也不利於後期的維護(點對點的方式),那該怎麼解決上面的問題呢?答案就是通過 Spring Cloud Bus。
Spring Cloud Bus 翻譯為訊息匯流排,使用輕量級的訊息代理來構建一個共用的訊息主題讓系統中所有微服務例項都能連線上來,由於該主題中產生的訊息會被所有例項監聽和消費,所以我們稱它為訊息匯流排。在總線上的各個例項都可以方便地廣播一些需要讓其他連線在該主題上的例項都知道的訊息,例如配置資訊的變更或者其他一些管理操作等。
架構示意圖(引用來源):
下面,我們需要利用 Spring Cloud Bus 來改造 Spring Cloud Config 的服務端和客戶端,其實非常簡單。
新增下面的依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
然後在bootstrap.yml
中新增下面配置:
spring:
rabbitmq:
host: manager1
port: 5672
username: admin
password: admin123
management:
security:
enabled: false
上面的配置資訊都是新增的,並且都需要配置在服務端和客戶端,通過上面的示例圖可以看到,配置資訊更新後請求的是服務端,那麼客戶端我們就不需要配置management.security.enabled
(也不需要配置spring-boot-starter-actuator
監控模組)。
服務端和客戶端的任何 Java 程式碼都不需要編寫,重新啟動服務,當配置資訊更新後,通過 Git 的 Webhooks 執行請求指令碼:curl -X POST http://manager1:port/bus/refresh
,服務端接受到請求之後,會通過 Spring Cloud Bus 通知所有的客戶端(通過 RabbitMQ),重新從配置中心獲取配置資訊,達到實時更新配置的目的。
上面的例項描述就到這裡。
RabbitMQ 的基本概念
RabbitMQ,是一個使用 erlang 編寫的 AMQP(高階訊息佇列協議)的服務實現,簡單來說,就是一個功能強大的訊息佇列服務。
RabbitMQ 最基本模型:
RabbitMQ 的基本概念:
- Producer:訊息生產者。
- Consumer:訊息消費者。
- Connection(連線):Producer 和 Consumer 通過TCP 連線到 RabbitMQ Server。
- Channel(通道):基於 Connection 建立,資料流動都是在 Channel 中進行。
- Exchange(交換器):生產者將訊息傳送到 Exchange(交換器),由 Exchange 將訊息路由到一個或多個 Queue 中(或者丟棄);Exchange 並不儲存訊息;Exchange Types 常用有 Fanout、Direct、Topic 三種類型,每種型別對應不同的路由規則。
- Queue(佇列):是 RabbitMQ 的內部物件,用於儲存訊息;訊息消費者就是通過訂閱佇列來獲取訊息的,RabbitMQ 中的訊息都只能儲存在 Queue 中,生產者生產訊息並最終投遞到 Queue 中,消費者可以從 Queue 中獲取訊息並消費;多個消費者可以訂閱同一個 Queue,這時 Queue 中的訊息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的訊息並處理。
- Binding(繫結):是 Exchange(交換器)將訊息路由給 Queue 所需遵循的規則。
- Routing Key(路由鍵):訊息傳送給 Exchange(交換器)時,訊息將擁有一個路由鍵(預設為空), Exchange(交換器)根據這個路由鍵將訊息傳送到匹配的佇列中。
- Binding Key(繫結鍵):指定當前 Exchange(交換器)下,什麼樣的 Routing Key(路由鍵)會被下派到當前繫結的 Queue 中。
另外,再說下 Exchange Types(交換器型別)的三種常用型別:
- Direct:完全匹配,訊息路由到那些 Routing Key 與 Binding Key 完全匹配的 Queue 中。比如 Routing Key 為
cleint-key
,只會轉發cleint-key
,不會轉發cleint-key.1
,也不會轉發cleint-key.1.2
。 - Topic:模式匹配,Exchange 會把訊息傳送到一個或者多個滿足萬用字元規則的 routing-key 的 Queue。其中
*
表號匹配一個 word,#
匹配多個 word 和路徑,路徑之間通過.
隔開。如滿足a.*.c
的 routing-key 有a.hello.c
;滿足#.hello
的 routing-key 有a.b.c.helo
。 - Fanout:忽略匹配,把所有傳送到該 Exchange 的訊息路由到所有與它繫結 的Queue 中。
下面通過一段程式碼,理解一下訊息釋出的流程(程式碼引用):
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='first', type='topic')
channel.queue_declare(queue='A')
channel.queue_declare(queue='B')
channel.queue_bind(exchange='first', queue='A', routing_key='a.*.*')
channel.queue_bind(exchange='first', queue='B', routing_key='a.#')
channel.basic_publish(exchange='first',
routing_key='a',
body='Hello World!')
channel.basic_publish(exchange='first',
routing_key='a.b.c',
body='Hello World!')
大致步驟:
- 先獲取一個 Connection(連線)。
- 從 Connection(連線)上獲取一個 Channel(通道)。
- 宣告一個 Exchange(交換器),只會建立一次。
- 宣告兩個 Queue,只會建立一次。
- 把 Queue 繫結到 Exchange(交換器)上.
- 向指定的 Exchange(交換器)傳送一條訊息.
因為基於 Exchage Topic 模式,在上面發出的兩條訊息當中,訊息a
只會被a.#
匹配到,而a.b.c
會被兩個都匹配到。所以,最終的結果會是佇列 A 中有一條訊息,佇列 B 中有兩條訊息。
從佇列取出訊息程式碼:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='A')
def callback(ch, method, properties, body):
print body
channel.basic_consume(callback, queue='A', no_ack=True)
channel.start_consuming()
服務消費者取出訊息,需要重新建立 Connection(連線)和 Exchange(交換器),但 Queue 並不會建立,只需要從 Channel 中獲取對應的 Queue 訊息即可。
通過例項理解 RabbitMQ 基本概念
上面例項服務部署的情況是:三臺管理伺服器(config-server-git/config-server-svn)和一臺工作伺服器(config-client-git/config-server-svn),因為做了叢集,服務的具體情況:
- config-server-git:3 個服務。
- config-server-svn:3 個服務。
- config-client-git:1 個服務。
- config-client-svn:1 個服務。
所以,總的部署服務有 8 個。
我們通過 RabbitMQ Server 管理介面中的內容,說下 Connection(連線)、Channel(通道)、Exchange(交換器)和 Queue(佇列)的具體使用情況(根據數量理解)。
1. Connection(連線)
為什麼 Connection(連線)數量為 16 個?因為部署的 8 個服務,各自發布和接受訊息(即作為小心釋出者,也作為訊息接受者),計算公式:16 = 8 * 2
。
2. Channel(通道)
為什麼 Channel(通道)數量為 16 個?因為 Connection(連線)數量為 16 個,Channel(通道)是在 Connection(連線)基礎上建立的。
3. Exchange(交換器)
為什麼 Exchange(交換器)數量為 1 個?因為都是使用的同一個 Exchange(交換器),名字為springCloudBus
,Exchange Type 為topic
,Routing Key 為#
。
4. Queue(佇列)
為什麼 Queue(佇列)數量為 8 個?因為部署的 8 個服務,各自發布和接受的 Queue 是同一個,一個服務對應一個 Queue。
參考資料:
相關推薦
通過例項理解 RabbitMQ 的基本概念
先說下自己開發的例項。 最近在使用 Spring Cloud Config 做分散式配置中心(基於 SVN/Git),當所有服務啟動後,SVN/Git 中的配置檔案更改後,客戶端服務讀取的還是舊的配置,並不能實時讀取(配置資訊會快取在客戶端),Spring Boot 提供了一種方式進行更新(通過spring
rabbitMQ 基本概念
取消 發現 會有 消息 nbsp 標識 發送 概念 dir RabbitMQ 整體上是一個生產者與消費者模型,主要負責接收、存儲和轉發消息。可以把消 息傳遞的過程想象成:當你將一個包裹送到郵局,郵局會暫存並最終將郵件通過郵遞員送到收件人的手上, RabbitMQ 就好比由
通過例項理解委託、lambda的演變
經過一段時間的學習,想寫個部落格記錄一下委託、lambda的學習理解過程,方便自己可以回憶知識點。如果有地方有問題,請各位大佬指點一下~~~~~ using System; using System.Collections.Generic; using System.Linq; using Syste
【spring 14】通過例項理解AOP原理
Aspect Oriented Programming 面向切面程式設計。解耦是程式設計師編碼開發過程中一直追求的。AOP也是為了解耦所誕生。 具體思想是:定義一個切面,在切面的縱向定義處理方法,處理完成之後,回到橫向業務流。 AOP 在Spring框架中被作為核心組成部
從一個奇怪的錯誤出發理解 Vue 基本概念
有人在學習 Vue 過程中遇到一個奇怪的問題,併為之迷惑不已——為什麼這麼簡單的一個專案都會出錯。 這是一個簡單到幾乎不能再簡單的 Vue 專案,在 index.html 的 body 中有一個 id 為 app 的 div 根元素,其中包含一個 my-com
RabbitMQ基本概念和使用
RabbitMQ是一個訊息代理,核心原理:傳送訊息,接收訊息。 RabbitMQ主要用於元件之間的解耦,訊息傳送者無需知道訊息使用者的存在,反之亦然。 單向解耦 雙向解耦(如:RPC) 例如一個
Kettle學習之路(3)理解一些基本概念
Kettle包括了在ETL開發和部署階段用到的多個程式,每個程式都有獨立功能。 Spoon:整合開發環境。提供了一個圖形化使用者介面,用於建立/編輯作業或者轉換,也可以用於執行/除錯作業
訊息中介軟體——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協議!
前言 本章學習,我們可以瞭解到以下知識點: 網際網路大廠為什麼選擇RabbitMQ? RabbiMQ的高效能之道是如何做到的? 什麼是AMQP高階協議? AMQP核心概念是什麼? RabbitMQ整體架構模型是什麼樣子的? RabbitMQ訊息是如何流轉的? 1. 初識RabbitMQ Rabbi
通過例項理解Java網路IO模型
網路IO模型及分類 網路IO模型是一個經常被提到的問題,不同的書或者部落格說法可能都不一樣,所以沒必要死摳字眼,關鍵在於理解。 Socket連線 不管是什麼模型,所使用的socket連線都是一樣的。 以下是一個典型的應用伺服器上的連線情況。客戶的各種裝置通過Http協議與Tomcat程序互動,Tomcat需要
rabbitmq系列-基本概念理解
消息中間件 proto 連接數 接收 chang protoc 匹配 分享 img 1.簡介 RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue protocol)的開源實現。AMQP高級消息隊列,說白了就是一個開源的消息中間件
promise 的基本概念 和如何解決js中的異步編程問題 對 promis 的 then all ctch 的分析 和 await async 的理解
委托 callback 分析 傳參 成功 visible 定時 data- 得到 * promise承諾 * 解決js中異步編程的問題 * * 異步-同步 * 阻塞-無阻塞 * * 同步和異步的區別?
進程基本概念理解
進程概念一、進程和程序1.進程的基本概念 所謂進程是由正文段用戶數據段以及系統數據段共同組成的一個執行環境,是一個動態實體。2.程序的基本概念 程序只是一個普通文件,是一個機器代碼指令和數據的集合,這些指令和數據存儲在磁盤上的一個可執行映像中,所以,程序是一個靜態的實體。3.進程的組成部分(1)正文段:存
機器學習基本概念理解
機器學習基本概念理解數據集:關系型數據庫中有很多表,表裏面有很多記錄,很多記錄就可以認為是數據集屬性(特征):一個表中有很多條記錄,每條記錄的表有很多屬性,如tb_stu(stu_id,stuname,stu_sex)s表中有3條屬性屬性值:屬性的取值,如stu_id可以等於1,2.3....n。stu_se
SCOM的基本概念的理解&警報的處理
scom警報處理警報在介紹了基本的SCOM安裝以及管理包的作用以及實現後,感覺缺少一些基本知識概念的介紹,好像地基沒有打穩的感覺,現在就給大家介紹一下基本的概念,以及平時運維工作中遇到這些概念或者警報出現的原理以及基本的處理方法。常說的四大金剛,即在警報方面的四個主要的概念:發現(Discover):在SCO
Oracle sequence的基本概念與理解
side frame create varchar2 inf sel *** 順序 -s 1.如何查看sequence的定義 2.dba_sequences相關字的定義 3.如何修改sequence *************************************
RabbitMQ消息隊列基本概念
內容 type 圖1 一個 stock 根據 nec 配置 idt 從圖中可以看出RabbitMQ主要由Exchange和Queue兩部分組成,然後通過RoutingKey關聯起來,消息投遞到Exchange然後通過Queue接收。RabbitMQ消息隊列基本概念Rabb
RabbitMQ第三課 基本概念和exchange
通信通道 路由規則 消息發送 display param 產生 parent 需要 rabbitmq Rabbitmq使用必須理解的一些概念(轉自:http://www.linuxidc.com/Linux/2013-11/92591.htm)channel:通道,amqp
初學編程基本概念理解
col ddr 個數 block set person bin address 理解 1.數據上下文 高人如此解釋 數據上下文就是不止它自身可以訪問,其子元素皆可訪問。比如有這麽一個數據類型: public class Person { public i
CEPH的基本概念及通過Ceph-Deploy快速部署CEPH集群
linux內核 health 初始化 nali 部署過程 本地 journal 內核 端到端 基礎概念 **OSD**:Object Storage Device,主要用於存儲數據,處理數據,,恢復,回不,平衡數據,並提供數據給monitor。 **Monitor**:Ce
學海無涯——人工智慧應用例項之寫作軟體的基本概念
公司技術培訓中提到了Auto Narrative Writing,即自動寫作。 目前,本人也僅是使用VBA生成SQL statement(Insert語句),原理是字串拼接。 而這個自動寫作,顯然是更改級別的“字串拼接”了。先學習基礎吧。 ----------------------