1. 程式人生 > 實用技巧 >SpringCloud學習筆記【十三】Spring Cloud Bus訊息匯流排

SpringCloud學習筆記【十三】Spring Cloud Bus訊息匯流排

目錄

本片要點

  • 簡要介紹匯流排的概念,以及分散式系統解決的問題。
  • 介紹Docker安裝RabbitMQ的基本命令。
  • SpringCloud Bus+ RabbitMQ實現全域性動態重新整理。

Spring Cloud Bus簡介

https://spring.io/projects/spring-cloud-bus

概述

SpringCloud Bus是將分散式系統的節點與輕量級訊息系統連結起來的框架,它整合了Java的事件處理機制和訊息中介軟體的功能。目前支援RabbitMQ和Kafka。【本篇文章使用RabbitMQ作為訊息代理】

管理和傳播分散式系統間的訊息,像一個分散式執行器,用於廣播狀態更改,事件推送等,也可以作為微服務間的通訊通道。

SpringCloud Bus可以配合SpringCloud Config實現配置的動態重新整理。

什麼是匯流排

在微服務架構的系統中,通常會使用輕量級的訊息代理來構建一個共用的訊息主題

,並讓系統中所有微服務例項都連線上來。由於該主題中產生的訊息會被所有例項監聽和消費,所以稱之為訊息匯流排。總線上的各個例項,都可以方便地廣播一些需要讓其他連線在該主題上的例項都知道的訊息。

基本原理

ConfigClient 例項都監聽MQ中同一個topic【預設是SpringCloudBus】。當一個服務重新整理資料的時候,它會把這個訊息放入Topic中,這樣其他監聽同一Topic的服務就能夠得到通知,然後去更新自身的配置。

Docker安裝RabbitMQ

可以參考:SpringBoot整合RabbitMQ以及Rabbit佇列學習

$ docker pull rabbitmq:3-management # management帶web介面管理
$ docker images  # 檢視image ID

$ docker run -d --name myrabbit -p 5672:5672 -p 15672:15672 cc86ffa2f398 #啟動  最後跟著 image ID

$ systemctl status firewalld #檢視防火牆的狀態【(running)意思是開啟,我們需要設定開放的埠】
$ firewall-cmd --list-ports #檢視防火牆開放的埠
$ firewall-cmd --zone=public --add-port=15672/tcp --permanent # 開放15672,15672是Web管理介面的埠
$ firewall-cmd --zone=public --add-port=5672/tcp --permanent # 開放5672,5672是MQ訪問的埠
$ firewall-cmd --reload # 使修改生效

訪問15672埠即可進入RabbitMQ的web管理介面,賬號密碼預設都是guest。

演示動態重新整理全域性廣播前置準備

前提條件:RabbitMQ環境已成功安裝。

接下來,仿照3355結構,再新建一個3366。

新建模組,引入依賴

        <!--新增訊息匯流排RabbitMQ支援-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>	
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

配置bootstrap.yml

server:
  port: 3366

spring:
  application:
    name: config-client
  cloud:
    #Config客戶端配置
    config:
      label: master #分支名稱
      name: config #配置檔名稱
      profile: dev #讀取字尾名稱   
      uri: http://localhost:3344 #配置中心地址

  #rabbitmq相關配置 15672是Web管理介面的埠;5672是MQ訪問的埠
  rabbitmq:
    host: [your hostname]
    port: 5672
    username: guest
    password: guest

#服務註冊到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

# 暴露監控端點
management:
  endpoints:
    web:
      exposure:
        include: "*"

編寫主啟動類

@EnableEurekaClient
@SpringBootApplication
public class ConfigClientMain3366 {
    public static void main(String[] args) {
        SpringApplication.run(ConfigClientMain3366.class, args);
    }
}

編寫介面

@RestController
@RefreshScope
public class ConfigClientController {
    @Value("${server.port}")
    private String serverPort;

    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String configInfo() {
        return "serverPort: " + serverPort + "\t\n\n configInfo: " + configInfo;
    }
}

設計思想

設計思想主要是以下兩種:

一、利用訊息匯流排觸發一個客戶端/bus/refresh,而重新整理所有客戶端的配置。

二、利用訊息匯流排觸發一個服務端ConfigServer的/bus/refresh,而重新整理所有客戶端的配置。

相比之下,圖二的架構【通知服務端ConfigServer】更加合理,圖一不合理的原因如下:

  1. 打破了微服務的職責單一性,微服務本身為業務模組,不應該擔任重新整理配置的職責。
  2. 破壞了微服務各節點的對等性。
  3. 存在侷限,如微服務遷移時,網路地址時常發生變化,這時如果希望自動重新整理,會作更多的修改。

開始演示動態重新整理全域性廣播

前提:前置準備已經完成,此時擁有3344ConfigServer,3355和3366Client,和Eureka配置中心7001。

為三個模組都新增訊息匯流排支援

        <!--新增訊息匯流排RabbitMQ支援-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

為三個模組配置yml

#rabbitmq相關配置
  rabbitmq:
    host: [your hostname]
    port: 5672
    username: guest
    password: guest

為ConfigServer配置yml

#rabbitmq相關配置,暴露bus重新整理配置的端點
management:
  endpoints: #暴露bus重新整理配置的端點
    web:
      exposure:
        include: 'bus-refresh'

為ConfigClient配置yml

# 暴露監控端點
management:
  endpoints:
    web:
      exposure:
        include: "*"

測試

依次啟動7001,3344,3355,3366模組,進行測試,依次訪問:

暫時是沒有任何問題的,配置資訊能夠成功從Gitee上獲取得到。

此時改變配置資訊的版本號,並向ConfigServer傳送一次POST請求:

$ curl -X POST "http://localhost:3344/actuator/bus-refresh"

再次測試3355和3366config client,已經成功實現一次通知,處處更新。

原理回顧

ConfigClient 例項都監聽RabbitMQ中同一個topic【預設是SpringCloudBus】。當一個服務重新整理資料的時候,它會把這個訊息放入Topic中,這樣其他監聽同一Topic的服務就能夠得到通知,然後去更新自身的配置。

動態重新整理定點通知

我們已經通過Spring Cloud Bus + RabbitMQ實現了一處通知,全域性廣播,處處更新。那,如果我只想通知其中某個client更新呢?如何定點通知只指定某個例項生效呢?

我們可以通過通知的url指定例項的destination:http://localhost:3344/actuator/bus-refresh/{destination},此時bus/refresh通知會通過destination引數類指定需要更新配置的服務或例項。

比如,只通知3355可以傳送下面這個請求:

$ curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
  • config-client為spring.application.name。
  • 3355為對應的埠號。

原始碼下載

本系列文章為《尚矽谷SpringCloud教程》的學習筆記【版本稍微有些不同,後續遇到bug再做相關說明】,主要做一個長期的記錄,為以後學習的同學提供示例,程式碼同步更新到Gitee:https://gitee.com/tqbx/spring-cloud-learning,並且以標籤的形式詳細區分每個步驟,這個系列文章也會同步更新。