spring cloud 中訊息匯流排(bus)使用
訊息系統
說到訊息系統大家耳熟能詳的幾個一般來說都有各自適用的場景,我們這裡簡單說一下幾個常見的訊息系統。
ActiveMQ是比較老牌的訊息系統,當然了不一定是大家第一個熟知的訊息系統,因為現在電商、網際網路規模越來越大,不斷進入程式設計師眼簾的大多是Kafka和RocketMQ。ActiveMQ出現的要比他們早,而且涵蓋的功能也特別全,路由、備份、查詢、事務、叢集等等。他的美中不足是不能支撐超大規模、超高併發的網際網路應用,ActiveMQ的併發承受能力在百萬級別,大概500次/s的訊息頻率。
Kafka是新一代的訊息系統,相對於ActiveMQ來說增加了分片功能,類似於資料庫分庫分表,一臺Broker僅負責一部分資料收發,從而使得他的伸縮性特別好,通過增加Broker就可以不斷增加處理能力。一般來說,Kafka被用來處理日誌流,作為流計算的接入點。在電商的訂單、庫存等系統裡邊一般不用,主要顧慮的是Kafka非同步刷盤機制可能導致資料丟失。當然,對於資料丟失這一點不同的工程師也有不同的看法,認為Kafka的Master-Slave的多寫機制,完全能夠避免資料丟失。
RocketMQ是阿里開源的一款訊息系統,開發的初衷就是要支撐阿里龐大的電商系統。RocketMQ和Kafka有很多相似之處,由於RocketMQ開發中很大程度上參考了Kafka的實現。RocketMQ同樣提供了優秀的分片機制,RocketMQ的分片比Kafka的分片有所增強,區分了絕對有序和非絕對有序兩種選項。另外RocketMQ採用的是同步刷盤,一般認為不會造成資料丟失。
RabbitMQ類似於ActiveMQ也是一個相對小型的訊息系統,他的優勢在於靈活的路由機制,可以進行自由配置。
Redis的pub/sub功能,由於Redis是記憶體級的系統,所以速度和單機的併發能力是上述四個訊息系統不能比擬的,但是也是由於記憶體儲存的緣故,在訊息的保障上就更弱一些。據說新浪部落格系統選擇了Redis的pub/sub作為訊息系統,不能不說藝高人膽大。
什麼時候用cloud bus
spring cloud bus在整個後端服務中起到聯通的作用,聯通後端的多臺伺服器。我們為什麼需要他做聯通呢?
後端伺服器一般都做了叢集化,很多臺伺服器,而且在大促活動期經常發生服務的擴容、縮容、上線、下線。這樣,後端伺服器的數量、IP就會變來變去,如果我們想進行一些線上的管理和維護工作,就需要維護伺服器的IP。
比如我們需要更新配置、比如我們需要同時失效所有伺服器上的某個快取,都需要向所有的相關伺服器傳送命令,也就是呼叫一個介面。
你可能會說,我們一般會採用zookeeper的方式,統一儲存伺服器的ip地址,需要的時候,向對應伺服器傳送命令。這是一個方案,但是他的解耦性、靈活性、實時性相比訊息匯流排都差那麼一點。
總的來說,就是在我們需要把一個操作散發到所有後端相關伺服器的時候,就可以選擇使用cloud bus了。
使用cloud bus之後,我們的服務端架構會變成這樣:
cloud bus能做什麼
當前spring cloud bus提供了兩個可用的介面:1./bus/env用於設定某一個配置項2./bus/refresh用於重新整理所有繫結到重新整理點的配置項。
這兩個介面是使用spring boot actuator方式釋出出來的(可以參見:深入SpringBoot:自定義Endpoint一文),接收到訊息後會使用spring的stream框架(可以參考:張開濤的解Spring事件驅動模型一文)把訊息傳播到所有註冊的相關伺服器。
/bus/env的引數格式:
name=&value=&destination=
/bus/refresh的引數格式:
destination=
當然了,上述的destination引數都可以不提供。
spring cloud config 配合spring cloud bus實現配置資訊更新
spring cloud config 配置更新有兩種方式:1.配置git倉庫的web hook,當git倉庫有更新時自動呼叫bus提供的重新整理介面,重新整理快取;2.手工呼叫bus提供的重新整理介面。
不論一方案還是二方案區別僅在於是不同的人觸發了重新整理介面。實際上,線上伺服器一般很少採用自動重新整理的機制,都會在修改後,確認無誤後再執行重新整理。
關鍵的修改點是把所有的後端伺服器連線到同一個訊息系統上,然後監聽配置更新訊息。
安裝RabbitMQ
安裝方法很簡單,直接在官網下載對應的安裝檔案就可以了。
因為RabbitMQ是Erlang語言寫的,所以如果你的機器上沒有安裝Erlang,那麼需要先安裝Erlang。
增加bus包的引用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
增加RabbitMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
增加@RefreshScope註解
@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigClientApplication.class, args);
}
@Value("${app-name}")
private String app_name;
@RequestMapping("hi")
public String hi(){
return "hello "+ app_name;
}
}