SpringCloud前置知識+RabbitMQ
一、微服務架構介紹
1.單體架構
單體架構也被稱為單體應用,它是將所有的功能模組全部耦合在一個專案中
1.1 單體架構特點
1.最終會被打包成一個獨立的單元(一個唯一 的jar包或war包)
2.會以一個程式的方式來執行
1.2 單體架構的優點與缺點
優點
- 專案易於管理
- 部署簡單
缺點
- 測試成本高
- 可伸縮性差
- 可靠性差
- 迭代困難
- 跨語言程度差
- 團隊協作難
2.微服務架構
2.1 什麼是微服務
微服務是一種架構風格。一個大型的複雜軟體應用,由一個或多個微服務組成。系統中 的各個微服務可被獨立部署,各個微服務之間是鬆耦合的。每個微服務僅關注於完成一件任 務並很好的完成該任務。
2.2 常見的架構風格
- 客戶端與服務端的
- 基於元件模型的架構(EJB)
- 分層架構(MVC)
- 面向服務架構(SOA)
2.3 微服務的特點
- 系統是由多個服務構成
- 每個服務可以單獨獨立部署
- 每個服務之間是鬆耦合的,服務內部高內聚,服務外部低耦合,高內聚就是一個專案專注的完成一個功能
2.4 微服務的優點與缺點
優點:
- 容易測試
- 可伸縮性強
- 可靠性強
- 跨語言程度更加靈活
- 團隊協作容易
- 系統迭代容易
缺點:
- 運維成本過高,部署的數量較多
- 介面相容多版本
- 分散式系統的複雜性
- 分散式事務
二、MVC、RPC、SOA、微服務架構之間的區別
1.MVC架構
其實MVC就是傳統的單體架構
代表技術:SpringMVC、Spring、MyBatis等等。
2.RPC架構
RPC(Remote Procedure Call):遠端過程呼叫。他一種通過網路從遠端計算機程式上請求 服務,而不需要了解底層網路技術的協議。
代表技術:Thrift、Hessian 等等
3. SOA架構
SOA(Service oriented Architecture):面向服務架構
ESB(Enterparise Servce Bus):企業服務匯流排,服務中介。主要是提供了一個服務於服務之間的互動。 ESB 包含的功能如:負載均衡,流量控制,加密處理,服務的監控,異常處理,監控 告急等等。
代表技術:Mule、WSO2
4.微服務架構
微服務就是一個輕量級的服務治理方案。
代表技術:SpringCloud、dubbo 等等
三、微服務的設計原則
- AKF拆分原則
- 前後端分離原則
- 無狀態服務
- RestFul通訊風格
1. AKF拆分原則
業界對於可擴充套件的系統架構設計有一個樸素的理念,就是: **通過加機器就可以解決容量和可用性問題。(如果一臺不行那就兩臺) **
這一理念在“雲端計算”概念瘋狂流行的今天,得到了廣泛的認可!對於一個規模 迅速增長的系統而言,容量和效能問題當然是首當其衝的。但是隨著時間的向前, 系統規模的增長,除了面對效能與容量的問題外,還需要面對功能與模組數量上 的增長帶來的系統複雜性問題以及業務的變化帶來的提供差異化服務問題。而許多系統,在架構設計時並未充分考慮到這些問題,導致系統的重構成為常態,從 而影響業務交付能力,還浪費人力財力!對此,《可擴充套件的藝術》一書提出了一 個更加系統的可擴充套件模型—— AKF 可擴充套件立方 (Scalability Cube)。這個立方 體中沿著三個座標軸設定分別為:X、Y、Z。
- Y 軸(功能) —— 關注應用中功能劃分,基於不同的業務拆分
- X 軸(水平擴充套件) —— 關注水平擴充套件,也就是”加機器解決問題”
- Z 軸(資料分割槽) —— 關注服務和資料的優先順序劃分,如按地域劃分
1.1 Y軸(功能 )
Y 軸擴充套件會將龐大的整體應用拆分為多個服務。每個服務實現一組相關的功 能,如訂單管理、客戶管理等。在工程上常見的方案是服務化架構(SOA) 。比 如對於一個電子商務平臺,我們可以拆分成不同的服務,組成下面這樣的架構:
但通過觀察上圖容易發現,當服務數量增多時,服務呼叫關係變得複雜。為系統新增一個新功能,要呼叫的服務數也變得不可控,由此引發了服務管理上的混亂。所以,一般情況下,需要採用服務註冊的機制形成服務閘道器來進行服務治理。系統的架構將變成下圖所示:
1.2 X軸(水平擴充套件)
X 軸擴充套件與我們前面樸素理念是一致的,通過絕對平等地複製服務與資料, 以解決容量和可用性的問題。其實就是將微服務執行多個例項,做叢集加負載均衡的模式。
為了提升單個服務的可用性和容量, 對每一個服務進行 X 軸擴充套件劃分。
1.3 Z軸(資料分割槽)
Z 軸擴充套件通常是指基於請求者或使用者獨特的需求,進行系統劃分,並使得劃分出來的子系統是相互隔離但又是完整的。以生產汽車的工廠來舉例:福特公司為了發展在中國的業務,或者利用中國的廉價勞動力,在中國建立一個完整的子 工廠,與美國工廠一樣,負責完整的汽車生產。這就是一種Z軸擴充套件。
工程領域常見的 Z 軸擴充套件有以下兩種方案:
-
單元化架構
在分散式服務設計領域,一個單元(Cell)就是滿足某個分割槽所有業務操作 的自包含閉環。如上面我們說到的Y軸擴充套件的 SOA 架構,客戶端對服務端節點 的選擇一般是隨機的,但是,如果在此加上Z軸擴充套件,那服務節點的選擇將不再是隨機的了,而是每個單元自成一體。如下圖:
-
資料分割槽
為了效能資料安全上的考慮,我們將一個完整的資料集按一定的維度劃分出 不同的子集。 一個分割槽(Shard),就是是整體資料集的一個子集。比如用尾號 來劃分使用者,那同樣尾號的那部分使用者就可以認為是一個分割槽。資料分割槽為一般 包括以下幾種資料劃分的方式:
- 資料型別(如:業務型別)
- 資料範圍(如:時間段,使用者 ID)
- 資料熱度(如:使用者活躍度,商品熱度)
- 按讀寫分(如:商品描述,商品庫存)
2. 前後端分離原則
何為前後端分離?前後端本來不就分離麼?這要從尷尬的 jsp 講起。分工精細化從來都 是蛋糕做大的原則,多個領域工程師最好在不需要接觸其他領域知識的情況下合作,才可能 使效率越來越高,維護也會變得簡單。jsp 的模板技術融合了 html 和 java 程式碼,使得傳統 MVC 開發中的前後端在這裡如膠似漆,前端做好頁面,後端轉成模板,發現問題再找前端, 前端又看不懂 java 程式碼......前後端分離的目的就是將這尷尬局面打破。 前後端分離原則,簡單來講就是前端和後端的程式碼分離,我們推薦的模式是最好採用物 理分離的方式部署,進一步促使更徹底的分離。如果繼續直接使用服務端模板技術,如:jsp, 把 java、js、html、css 都堆到一個頁面裡,稍微複雜一點的頁面就無法維護了。
這種分離方式有幾個好處:
- 前後端技術分離,可以由各自的專家來對各自的領域進行優化,這樣前段的使用者體 驗優化效果更好。
- 分離模式下,前後端互動介面更清晰,就剩下了介面模型,後端的介面簡潔明瞭, 更容易維護。
- 前端多渠道整合場景更容易實現,後端服務無需變更,採用統一的資料和模型,可 以支援多個前端:例如:微信 h5 前端、PC 前端、安卓前端、IOS 前端。
3. 無狀態服務
對於無狀態服務,首先說一下什麼是狀態:如果一個資料需要被多個服務共 享,才能完成一筆交易,那麼這個資料被稱為狀態。進而依賴這個“狀態”資料的 服務被稱為有狀態服務,反之稱為無狀態服務。
那麼這個無狀態服務原則並不是說在微服務架構裡就不允許存在狀態,表達 的真實意思是要把有狀態的業務服務改變為無狀態的計算類服務,那麼狀態資料 也就相應的遷移到對應的“有狀態資料服務”中。
場景說明:例如我們以前在本地記憶體中建立的資料快取、Session 快取,到 現在的微服務架構中就應該把這些資料遷移到分散式快取中儲存,讓業務服務變 成一個無狀態的計算節點。遷移後,就可以做到按需動態伸縮,微服務應用在運 行時動態增刪節點,就不再需要考慮快取資料如何同步的問題。
4. RestFul的通訊風格
作為一個原則來講本來應該是個“無狀態通訊原則”,在這裡我們直接推薦一 個實踐優選的 Restful 通訊風格 ,因為他有很多好處:
- 無狀態協議 HTTP,具備先天優勢,擴充套件能力很強。例如需要安全加密,有 現成的成熟方案 HTTPS 即可。
- JSON 報文序列化,輕量簡單,人與機器均可讀,學習成本低,搜尋引擎友好。
- 語言無關,各大熱門語言都提供成熟的 Restful API 框架,相對其他的一些 RPC 框架生態更完善。
四、SpringCloud簡介
1.什麼是SpringCloud
SpringCloud是一個服務治理平臺,提供了一些服務框架。包含了:服務註冊 與發現、配置中心、訊息中心 、負載均衡、資料監控等等。
Spring Cloud 是一個微服務框架,**相比 Dubbo 等 RPC 框架,Spring Cloud 提 供的全套的分散式系統解決方案。 **
Spring Cloud 對微服務基礎框架 Netflix 的多個開源元件進行了封裝,同時又實現 了和雲端平臺以及和 Spring Boot 開發框架的整合。
Spring Cloud 為微服務架構開發涉及的配置管理,服務治理,熔斷機制,智慧路由, 微代理,控制匯流排,一次性 token,全域性一致性鎖,leader 選舉,分散式 session,集 群狀態管理等操作提供了一種簡單的開發方式。
Spring Cloud 為開發者提供了快速構建分散式系統的工具,開發者可以快速的啟動 服務或構建應用、同時能夠快速和雲平臺資源進行對接。
2. SpringCloud的專案的位置
SpingCloud是Spring 的一個頂級專案與 SpringBoot、SpringData 位於同一位置。
3.SpringCloud的子專案
3.1 SpringCloud Config
配置管理工具,支援使用 Git 儲存配置內容,支援應 用配置的外部化儲存,支援客戶端配置資訊重新整理、加解密配置內容等
3.2 SpringCloud Bus
事件、訊息匯流排,用於在叢集(例如,配置變化事件)中 傳播狀態變化,可與 Spring Cloud Config 聯合實現熱部署。
3.3 Spring Cloud Netflix>
**針對多種 Netflix 元件提供的開發工具包,其中包括 Eureka、Hystrix、Zuul、Archaius 等。 **
-
Netflix Eureka
一個基於 rest 服務的服務治理元件,包括服務註冊中心、服務註冊與服務發現機制的實現,實現了雲端負載均衡和中間層伺服器的故障轉移。
-
Netflix Hystrix
容錯管理工具,實現斷路器模式,通過控制服務的節點,從而對延遲和故障提供更強大的容錯能力。
-
Netflix Ribbon
客戶端負載均衡的服務呼叫元件。
-
Netflix Feign
基於Ribbon和Hystrix的宣告式服務呼叫元件。
-
Netflix Zuul
微服務閘道器,提供動態路由,訪問過濾等服務。
-
Netflix Archaius:
配置管理 API,包含一系列配置管理 API,提供動 態型別化屬性、執行緒安全配置操作、輪詢框架、回撥機制等功能。
3.4 Spring Cloud for Cloud Foundry
通過 Oauth2 協議繫結服務到 CloudFoundry,CloudFoundry 是 VMware 推出的開源 PaaS 雲平臺。
3.5 Spring Cloud Sleuth
日誌收集工具包,封裝了 Dapper,Zipkin 和 HTrace 操作。
3.6 Spring Cloud Data Flow
大資料操作工具,通過命令列方式操作資料流。
3.7 Spring Cloud Security
安全工具包,為你的應用程式新增安全控制,主要是指OAuth2。
3.8 Spring Cloud Consul
封裝了Consul操作,consul是一個服務發現與配置工具,與Docker容器可以無縫整合。
3.9 Spring Cloud Zookeeper
操作Zookeeper的工具包 , 用使用zookeeper方式的服務註冊和發現。
3.10 Spring Cloud Stream:
資料流操作開發包,封裝了Redis、Rabbit、Kafka 等傳送接收訊息。
3.11 Spring Cloud CLI
基於 Spring Boot CLI,可以讓你以命令列方式快速建立雲元件。
五、SpringCloud與Dubbo的區別
六、 Spring Cloud版本說明
1.常見版本號說明
軟體版本號:2.0.2.RELEASE
-
2:主版本號。當功能模組有較大更新或者整體架構發生變化時,主版本號會更新
-
0:次版本號。次版本表示只是區域性的一些變動。
-
2:修改版本號。一般是 bug 的修復或者是小的變動
-
RELEASE:希臘字母版本號。次版本號使用者標註當前版本的軟體處於哪個開發階段
1.1希臘字母版本號
- Base:設計階段。只有相應的設計沒有具體的功能實現
- Alpha:軟體的初級版本。存在較多的 bug
- Bate:表示相對 alpha 有了很大的進步,消除了嚴重的 bug,還存在一些潛在的 bug。
- Release:該版本表示最終版。
2. Spring Cloud 版本號說明
2.1為什麼 Spring Cloud 版本用的是單詞而不是數字?
設計的目的是為了更好的管理每個 Spring Cloud 的子專案的清單。避免子的版本號與子 專案的版本號混淆。
2.2 版本號單詞的定義規則
採用倫敦的地鐵站名稱來作為版本號的命名,根據首字母排序,字母順序靠後的版本號越大。
2.3 版本釋出計劃說明
七、RabbitMQ
1.什麼是RabbitMQ
MQ全稱Message Queue(訊息佇列),是一種應用程式與應用程式之間通訊的一種方式,應用程式可以通過向訊息佇列中讀寫訊息進行資訊的互動,而不是以應用相互呼叫的方式進行通訊。
2.安裝RabbitMQ
2.1 安裝rabbitmq所需要的依賴包
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
複製程式碼
2.2 下載安裝包(cd /usr/local/software)
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
複製程式碼
2.3 安裝服務命令
#第一步:安裝erlang語言環境
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
#第二步:安裝socat加解密軟體
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
#第三步:最後安裝rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
複製程式碼
2.4 修改叢集使用者與連線心跳檢測
**注意修改vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app檔案 **
修改:loopback_users 中的 <<"guest">>,只保留guest(不修改只能通過localhost訪問)
2.5 修改本機系統檔案
#修改
vim /etc/rabbitmq/rabbitmq-env.conf
複製程式碼
新增:NODENAME=rabbit
#修改
vim /etc/hostname
複製程式碼
#修改本地檔案
vim /etc/hosts
複製程式碼
#驗證伺服器是可用的
rabbitmq-server start &
複製程式碼
**執行管控臺外掛 **
rabbitmq-plugins enable rabbitmq_management
複製程式碼
#檢查埠
lsof -i:5672
複製程式碼
#通過
ps -ef|grep rabbitmq
複製程式碼
訪問地址:http://192.168.159.8:15672
#下載延時外掛:
wget https://dl.bintray.com/rabbitmq/communityplugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215- 3.6.x.zip
複製程式碼
#解壓延時外掛
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
複製程式碼
#把延時外掛拷貝到指定目錄下
cp rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.5/plugins
複製程式碼
#啟動延時外掛
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
複製程式碼
3.命令列和管控臺
**開啟管控臺外掛 rabbitmq-plugus rabbitmq_management 來開啟管控臺 **
測試連線: http://ip:15672(來訪問) 使用者名稱密碼 guest/guest
3.1 管理控制檯命令
a.起停服務命令
#啟動服務
rabbitmqctl start_app
複製程式碼
啟動rabbitmq節點保證需要erlang虛擬機器器節點起來才能執行)
#停止服務
rabbitmqctl stop_app
複製程式碼
停止rabbtimq節點,但是不會停止erlang節點rabbitmqctl stop都會停止
#檢視服務狀態
rabbtimqctl status
複製程式碼
b. 使用者操作命令
#檢視所有使用者列表
rabbitmqctl list_users
複製程式碼
#新增使用者
rabbitmqctl add_user luyi luyi
複製程式碼
#設定rabbitmq使用者的角色
rabbitmqctl set_user_tags luyi administrator
複製程式碼
#為使用者設定許可權
rabbitmqctl set_permissions -p / luyi ".*" ".*" ".*"
rabbitmqctl set_permissions -p <虛擬機器器> <使用者名稱> ".*" ".*" ".*"
複製程式碼
#列出使用者許可權
rabbitmqctl list_user_permissions luyi
複製程式碼
#清除使用者許可權
rabbitmqctl clear_permissions -p <虛擬機器器> <使用者名稱>
rabbitmqctl clear_permissions -p / root
複製程式碼
#刪除使用者
rabbitmqctl delete_user root #root是使用者名稱
複製程式碼
#修改密碼
rabbitmqctl change_password 使用者名稱 新密碼
複製程式碼
c.虛擬主機操作
rabbitmqctl add_vhost /cloudmall 增加一個虛擬主機
rabbitmqctl list_vhosts; 檢視所有的虛擬主機
rabbitmqctl list_permissions -p /cloudmall 檢視虛擬主機的許可權
rabbitmqctl delete_vhost /cloudmall 刪除虛擬主機
d. 操作佇列命令
rabbitmqctl list_queues 查詢所有佇列
rabbitmqctl -p vhostpath purge_queue blue 清除佇列訊息
e. 高階命令
rabbitmqctl reset 移除所有資料 該命令需要在 rabbitmqctl stop_app命令之後才執行(也就是說 在服 務停止後) r
abbitmqctl join_cluster [--ram] 組成叢集命令
rabbitmqctl cluster_status 檢視叢集狀態
rabbitmqctl change_cluster_node_type dist|ram 修改叢集節點儲存資料模式
rabbitmqctl forget_cluster_node [--offline]忘記節點 (摘除節點)
rabbitmqctc rename_cluster_node oldnode1 newnode1 oldnode2 newnode2 修改節點名稱
4.為什麼使用RabbitMQ
4.1 實現非同步
4.2 解耦
4.3 流量削峰
5.訊息佇列基礎知識
5.1 Provider
訊息生產者,也就是傳送訊息的程式
5.2 Consumer
訊息消費者,也就是接收訊息的程式
5.3 沒有使用訊息佇列的通訊方式
5.4 使用訊息佇列後的通訊方式
5.5 什麼是佇列
佇列就像生活中的商店,商品製造商是訊息生產者,顧客是訊息消費者,他們之間通過商店實現互動
5.6 佇列和應用程式的關係
多個訊息生產者可以將訊息傳送到同一個訊息佇列中,多個訊息消費者可以從同一個訊息佇列中取出訊息。
6.RabbitMQ入門
6.1 建立專案
6.2 新增依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
複製程式碼
6.3 配置全域性配置檔案
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
複製程式碼
6.4 建立配置類
/**
* Author: LuYi
* Date: 2019/11/3 14:03
* Description: 佇列配置類
*/
@Configuration
public class QueueConfig {
/**
* 建立佇列
* @return
*/
@Bean
public Queue createQueue(){
return new Queue("hello-queue");
}
}
複製程式碼
6.5 編寫訊息傳送者
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 佇列名稱
//引數二: 訊息
rabbitTemplate.convertAndSend("hello-queue",msg);
}
}
複製程式碼
6.6 編寫訊息接收者
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*/
@Component
public class Receiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitListener(queues = "hello-queue")
public void process(String msg){
System.out.println("Receiver : " + msg);
}
}
複製程式碼
6.7 測試
/**
* 訊息佇列測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
}
/**
* 測試訊息佇列
*/
@Test
public void test1(){
sender.send("Hello RabbitMQ");
}
}
複製程式碼
7.RabbitMQ原理圖
1.Message(訊息):訊息是不具名的,它是由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列可選屬性組成,這些屬性包括:routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出訊息可能永續性儲存)等。
2.Publisher(生產者):向交換機傳送訊息的客戶端程式
3.Consumer(消費者):從訊息佇列中獲取訊息的客戶端程式
4.Exchange(交換機):用來接收客戶端發過來的訊息,並將這些訊息路由給伺服器中的訊息佇列
三種常用的交換機:
a.direct(釋出與訂閱,完全匹配)
b.fanout(廣播)
c.topic(主題,規則匹配)
5.Binding(繫結):用於交換機與訊息佇列之間的關聯,一個繫結就是根據路由鍵將交換機與訊息佇列連線起來的規則。
6.Queue(訊息佇列):用於儲存訊息,直到將訊息傳送給消費者
7.Routing-key(路由鍵):RabbitMQ決定將訊息投遞到那個訊息佇列的規則。佇列通過路由鍵繫結交換機,訊息傳送到MQ伺服器時,訊息將擁有一個路由鍵,即便是空的,RabbitMQ也會將其和繫結使用的路由鍵進行匹配。
如果匹配,訊息會被髮送到訊息佇列,如果不匹配,訊息將進入黑洞。
8.Connection(連結):Rabbit伺服器和服務之間建立的TCP連線
9.Channel(通道):通道是TCP中的虛擬連結,比如電纜就是TCP,通道則是一個獨立光纖束。一條TCP連線建立多條通道是沒有問題的。
TCP一旦開啟,就會建立AMQP通道。無論是釋出訊息、接收訊息、訂閱佇列,這些動作都是通過通道完成的。
10.Virtual Host(虛擬主機):表示一臺交換機和訊息佇列。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域, 每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。 vhost 是 AMQP 概念的基礎,必須在連結時指定, RabbitMQ 預設的 vhost 是/
11.Borker:表示訊息佇列的伺服器實體
交換機和路由器的關係是什麼?
交換機需要通過路由鍵和訊息佇列進行繫結,如果路由鍵相同,那麼訊息會被路由到對應的訊息佇列中,可以理解為路由鍵是決定訊息傳送到那個訊息佇列的規則
八、RabbitMQ交換器
1.Direct(釋出與訂閱 完全匹配)
1.1 需求
1.2 建立專案
rabbitmq-direct-consumer
rabbitmq-direct-provider
1.3 修改全域性配置檔案
consumer的配置檔案
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱(處理日誌的交換器)
mq.config.exchange=log.direct
#info級別的佇列名稱
mq.config.queue.info=log.info
#info的路由鍵
mq.config.queue.info.routing.key=log.info.routing.key
#error級別的佇列名稱
mq.config.queue.error=log.error
#error的路由鍵
mq.config.queue.error.routing.key=log.error.routing.key
複製程式碼
provider的配置檔案
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱(處理日誌的交換器)
mq.config.exchange=log.direct
#info的路由鍵
mq.config.queue.info.routing.key=log.info.routing.key
#error的路由鍵
mq.config.queue.error.routing.key=log.error.routing.key
複製程式碼
1.4 編寫consumer
InfoReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),key = "${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("InfoReceiver : " + msg);
}
}
複製程式碼
ErrorReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",key = "${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("ErrorReceiver : " + msg);
}
}
複製程式碼
1.5 編寫provider
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
//交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
//路由鍵名稱
@Value("${mq.config.queue.error.routing.key}")
private String routingKey;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 交換器名稱
//引數二: 路由鍵
//引數三: 訊息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
}
複製程式碼
1.6 測試
/**
* 訊息佇列測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
}
@Test
public void test1() throws InterruptedException {
while (true){
Thread.sleep(1000);
sender.send("Hello RabbitMQ");
}
}
}
複製程式碼
2.Topic交換器(主題,規則匹配)
2.1 需求
2.2 建立專案
rabbitmq-topic-consumer
rabbitmq-topic-provider
2.3 修改配置檔案
provider
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱(處理日誌的交換器)
mq.config.exchange=log.topic
複製程式碼
consumer
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱(處理日誌的交換器)
mq.config.exchange=log.topic
#info級別的佇列名稱
mq.config.queue.info=log.info
#error級別的佇列名稱
mq.config.queue.error=log.error
#log級別的佇列名稱
mq.config.queue.all=log.all
複製程式碼
2.4 編寫provider
UserSender
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class UserSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 交換器名稱
//引數二: 路由鍵
//引數三: 訊息
rabbitTemplate.convertAndSend(exchange,"user.log.debug","user.log.debug "+msg);
rabbitTemplate.convertAndSend(exchange,"user.log.info","user.log.info "+msg);
rabbitTemplate.convertAndSend(exchange,"user.log.warn","user.log.warn "+msg);
rabbitTemplate.convertAndSend(exchange,"user.log.error","user.log.error "+msg);
}
}
複製程式碼
ProductSender
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class ProductSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 交換器名稱
//引數二: 路由鍵
//引數三: 訊息
rabbitTemplate.convertAndSend(exchange,"product.log.debug","product.log.debug "+msg);
rabbitTemplate.convertAndSend(exchange,"product.log.info","product.log.info "+msg);
rabbitTemplate.convertAndSend(exchange,"product.log.warn","product.log.warn "+msg);
rabbitTemplate.convertAndSend(exchange,"product.log.error","product.log.error "+msg);
}
}
複製程式碼
OrderSender
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 交換器名稱
//引數二: 路由鍵
//引數三: 訊息
rabbitTemplate.convertAndSend(exchange,"order.log.debug","order.log.debug "+msg);
rabbitTemplate.convertAndSend(exchange,"order.log.info","order.log.info "+msg);
rabbitTemplate.convertAndSend(exchange,"order.log.warn","order.log.warn "+msg);
rabbitTemplate.convertAndSend(exchange,"order.log.error","order.log.error "+msg);
}
}
複製程式碼
2.5 編寫consumer
InfoReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",type = ExchangeTypes.TOPIC),key = "*.log.info"
)
)
public class InfoReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("InfoReceiver : " + msg);
}
}
複製程式碼
ErrorReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",key = "*.log.error"
)
)
public class ErrorReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("ErrorReceiver : " + msg);
}
}
複製程式碼
AllReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.all}",key = "*.log.*"
)
)
public class AllReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("AllReceiver : " + msg);
}
}
複製程式碼
2.6 測試
/**
* 訊息佇列測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {
@Autowired
private UserSender userSender;
@Autowired
private ProductSender productSender;
@Autowired
private OrderSender orderSender;
@Test
void contextLoads() {
}
@Test
public void test1() {
userSender.send("UserSender...");
productSender.send("ProductSender...");
orderSender.send("OrderSender...");
}
}
複製程式碼
3. Fanout交換器(廣播)
3.1 需求
3.2 建立專案
rabbitmq-fanout-consumer
rabbitmq-fanout-provider
3.3 修改配置檔案
consumer
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱
mq.config.exchange=order.fanout
#簡訊佇列名稱
mq.config.queue.sms=order.sms
#push佇列名稱
mq.config.queue.push=order.push
複製程式碼
provider
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱(處理日誌的交換器)
mq.config.exchange=order.fanout
複製程式碼
3.4 編寫consumer
SmsReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}",type = ExchangeTypes.FANOUT)
)
)
public class SmsReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("SmsReceiver : " + msg);
}
}
複製程式碼
PushReceiver
/**
* Author: LuYi
* Date: 2019/11/3 14:12
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.push}",type = ExchangeTypes.FANOUT)
)
)
public class PushReceiver {
/**
* 接收訊息的方法,採用訊息佇列監聽機制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("PushReceiver : " + msg);
}
}
複製程式碼
3.5 編寫provider
/**
* Author: LuYi
* Date: 2019/11/3 14:06
* Description: 訊息傳送者
*/
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/**
* 傳送訊息的方法
*/
public void send(String msg){
//向訊息佇列傳送訊息
//引數一: 交換器名稱
//引數二: 路由鍵
//引數三: 訊息
rabbitTemplate.convertAndSend(exchange,"",msg);
}
}
複製程式碼
3.6 測試
/**
* 訊息佇列測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Springcloud01MqApplication.class)
class Springcloud01MqApplicationTests {
@Autowired
private OrderSender orderSender;
@Test
void contextLoads() {
}
@Test
public void test1() throws InterruptedException {
while (true){
Thread.sleep(1000);
orderSender.send("Hello RabbitMQ");
}
}
}
複製程式碼
4.使用RabbbitMQ實現鬆耦合設計
4.1 修改配置檔案
#設定應用的名稱
spring.application.name=springcloud01-mq
#指定rabbitmq
spring.rabbitmq.host=192.168.234.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=luyi
spring.rabbitmq.password=luyi
#設定交換器的名稱
mq.config.exchange=order.fanout
#簡訊佇列名稱
mq.config.queue.sms=order.sms
#push佇列名稱
mq.config.queue.push=order.push
#紅包服務佇列名稱
mq.config.queue.red=order.red
複製程式碼
4.2 新增receiver
RedReceiver
/**
* Author: LuYi
* Date: 2019/11/4 16:20
* Description: 訊息接收者
*
* @RabbitListener bindings:繫結佇列
*
* @QueueBinding value:繫結佇列名稱
* exchange:配置交換器
*
* @Queue value:配置佇列名稱
* autoDelete:是否為可刪除的臨時佇列
*
* @Exchange value:為交換器起名
* type:指定當前交換器的型別
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.red}",type = ExchangeTypes.FANOUT)
)
)
public class RedReceiver {
@RabbitHandler
public void process(String msg){
System.out.println("給使用者傳送10元紅包 " + msg);
}
}
複製程式碼
九、RabbitMQ訊息處理
1. RabbitMQ的訊息持久化處理
1.1 建立專案
rabbitmq-direct-durable-provider
rabbitmq-direct-durable-consumer
1.2 autoDelete
@Queue:當所有消費客戶端連線斷開後,是否自動刪除佇列,如果設定成true表示刪除,false表示不刪除
@Exchange:當所有繫結佇列都不在使用時,是否自動刪除交換機,true:刪除,false:不刪除
2.RabbitMQ中的訊息確認ACK機制
-
什麼是訊息確認ACK
如果在消費者接收訊息是,伺服器出現了異常導致這條訊息無法被接收,這是RabbitMQ可以通過ACK機制保證訊息不丟失
-
ACK的訊息確認機制
ACK機制是消費者接收到訊息之後向RabbitMQ傳送的,RabbitMQ收到ACK後才會將該訊息從訊息佇列中刪除。
- 如果RabbitMQ沒有接收到消費者的ACK標識,那麼RabbitMQ會將這個訊息放入訊息佇列中再次傳送。
- 如果在叢集情況下,RabbitMQ會將這個訊息推送給線上的其他消費者。這種機制確保了消費者服務端在發生故障時不會丟失任何訊息。
- 訊息永遠不會從RabbitMQ中刪除,只有當消費者正確傳送了ACK並且RabbitMQ也做出了確認之後才會進行刪除。
- 訊息的ACK確認機制是預設開啟的。
-
ACK機制的開發注意事項
如果忘記了ACK,那麼當Consumer退出時,Message會一直重發,這樣就會佔用大量的RabbitMQ的記憶體,由於RabbitMQ會長時間執行,所以這個情況是致命的。