Spring Cloud構建微服務架構: 訊息匯流排
Spring Cloud Bus除了支援RabbitMQ的自動化配置之外,還支援現在被廣泛應用的Kafka。在本文中,我們將搭建一個Kafka的本地環境,並通過它來嘗試使用Spring Cloud Bus對Kafka的支援,實現訊息匯流排的功能。由於本文會以之前Rabbit的實現作為基礎來修改,所以先閱讀《Spring Cloud構建微服務架構(七)訊息匯流排》有助於理解本文。
Kafka簡介
Kafka是一個由LinkedIn開發的分散式訊息系統,它於2011年初開源,現在由著名的Apache基金會維護與開發。Kafka使用Scala實現,被用作LinkedIn的活動流和運營資料處理的管道,現在也被諸多網際網路企業廣泛地用作為資料流管道和訊息系統。
Kafka是基於訊息釋出/訂閱模式實現的訊息系統,其主要設計目標如下:
- 訊息持久化:以時間複雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的訪問效能。
- 高吞吐:在廉價的商用機器上也能支援單機每秒100K條以上的吞吐量
- 分散式:支援訊息分割槽以及分散式消費,並保證分割槽內的訊息順序
- 跨平臺:支援不同技術平臺的客戶端(如:Java、PHP、Python等)
- 實時性:支援實時資料處理和離線資料處理
- 伸縮性:支援水平擴充套件
Kafka中涉及的一些基本概念:
- Broker:Kafka叢集包含一個或多個伺服器,這些伺服器被稱為Broker。
- Topic:邏輯上同Rabbit的Queue佇列相似,每條釋出到Kafka叢集的訊息都必須有一個Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個Broker上,但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
- Partition:Partition是物理概念上的分割槽,為了提供系統吞吐率,在物理上每個Topic會分成一個或多個Partition,每個Partition對應一個資料夾(儲存對應分割槽的訊息內容和索引檔案)。
- Producer:訊息生產者,負責生產訊息併發送到Kafka Broker。
- Consumer:訊息消費者,向Kafka Broker讀取訊息並處理的客戶端。
- Consumer Group:每個Consumer屬於一個特定的組(可為每個Consumer指定屬於一個組,若不指定則屬於預設組),組可以用來實現一條訊息被組內多個成員消費等功能。
快速入門
在對Kafka有了一些基本瞭解之後,下面我們來嘗試構建一個Kafka服務端,並體驗一下基於Kafka的訊息生產與消費。
環境安裝
首先,我們需要從官網上下載安裝介質。下載地址為:http://kafka.apache.org/downloads.html
。本例中採用的版本為:Kafka-0.10.0.1
在解壓Kafka的安裝包之後,可以看到其目錄結構如下:
kafka
+-bin
+-windows
+-config
+-libs
+-logs
+-site-docs
由於Kafka的設計中依賴了ZooKeeper,所以我們可以在bin
和config
目錄中除了看到Kafka相關的內容之外,還有ZooKeeper相關的內容。其中bin
目錄存放了Kafka和ZooKeeper的命令列工具,bin
根目錄下是適用於Linux/Unix的shell,而bin/windows
下的則是適用於windows下的bat。我們可以根據實際的系統來設定環境變數,以方便後續的使用和操作。而在config
目錄中,則是用來存放了關於Kafka與ZooKeeper的配置資訊。
啟動測試
下面我們來嘗試啟動ZooKeeper和Kafka來進行訊息的生產和消費。示例中所有的命令均已配置了Kafka的環境變數為例。
- 啟動ZooKeeper,執行命令:
zookeeper-server-start config/zookeeper.properties
,該命令需要指定zookeeper的配置檔案位置才能正確啟動,kafka的壓縮包中包含了其預設配置,開發與測試環境不需要修改。
[2016-09-28 08:05:34,849] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-09-28 08:05:34,850] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,851] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-09-28 08:05:34,852] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-09-28 08:05:34,868] INFO Reading configuration from: config\zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-09-28 08:05:34,869] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
...
[2016-09-28 08:05:34,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
從控制檯資訊中,我們可以看到ZooKeeper從指定的config/zookeeper.properties
配置檔案中讀取資訊並繫結2181埠啟動服務。有時候啟動失敗,可檢視一下埠是否被佔用,可以殺掉佔用程序或通過修改config/zookeeper.properties
配置檔案中的clientPort
內容以繫結其他埠號來啟動ZooKeeper。
啟動Kafka,執行命令:
kafka-server-start config/server.properties
,該命令也需要指定Kafka配置檔案的正確位置,如上命令中指向瞭解壓目錄包含的預設配置。若在測試時,使用外部集中環境的ZooKeeper的話,我們可以在該配置檔案中通過zookeeper.connect
引數來設定ZooKeeper的地址和埠,它預設會連線本地2181埠的ZooKeeper;如果需要設定多個ZooKeeper節點,可以為這個引數配置多個ZooKeeper地址,並用逗號分割。比如:zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
。建立Topic,執行命令:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
,通過該命令,建立一個名為“test”的Topic,該Topic包含一個分割槽一個Replica。在建立完成後,可以使用kafka-topics --list --zookeeper localhost:2181
命令來檢視當前的Topic。
另外,如果我們不使用kafka-topics
命令來手工建立,直接進行下面的內容進行訊息建立時也會自動建立Topics來使用。
建立訊息生產者,執行命令:
kafka-console-producer --broker-list localhost:9092 --topic test
。kafka-console-producer
命令可以啟動Kafka基於命令列的訊息生產客戶端,啟動後可以直接在控制檯中輸入訊息來發送,控制檯中的每一行資料都會被視為一條訊息來發送。我們可以嘗試輸入幾行訊息,由於此時並沒有消費者,所以這些輸入的訊息都會被阻塞在名為test的Topics中,直到有消費者將其消費掉位置。建立訊息消費者,執行命令:
kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning
。kafka-console-consumer
命令啟動的是Kafka基於命令列的訊息消費客戶端,在啟動之後,我們馬上可以在控制檯中看到輸出了之前我們在訊息生產客戶端中傳送的訊息。我們可以再次開啟之前的訊息生產客戶端來發送訊息,並觀察消費者這邊對訊息的輸出來體驗Kafka對訊息的基礎處理。
整合Spring Cloud Bus
在上一篇使用Rabbit實現訊息匯流排的案例中,我們已經通過引入spring-cloud-starter-bus-amqp
模組,完成了使用RabbitMQ來實現的訊息匯流排。若我們要使用Kafka來實現訊息匯流排時,只需要把spring-cloud-starter-bus-amqp
替換成spring-cloud-starter-bus-kafka
模組,在pom.xml
的dependenies節點中進行修改,具體如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
如果我們在啟動Kafka時均採用了預設配置,那麼我們不需要再做任何其他配置就能在本地實現從RabbitMQ到Kafka的切換。我們可以嘗試把剛剛搭建的ZooKeeper、Kafka啟動起來,並將修改為spring-cloud-starter-bus-kafka
模組的config-server和config-client啟動起來。
在config-server啟動時,我們可以在控制檯中看到如下輸出:
2016-09-28 22:11:29.627 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus
2016-09-28 22:11:29.642 INFO 15144 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread.
...
016-09-28 22:11:30.290 INFO 15144 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:11:30.298 INFO 15144 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
...
2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.integration.channel.DirectChannel : Channel 'config-server:7001.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:11:30.322 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
...
2016-09-28 22:11:31.465 INFO 15144 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]4178cb34
2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:11:31.467 INFO 15144 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.8b9e6c7b-6a50-48c5-b981-8282a0d5a30b
從控制檯的輸出內容,我們可以看到config-server連線到了Kafka中,並使用了名為springCloudBus
的Topic。
此時,我們可以使用kafka-topics --list --zookeeper localhost:2181
命令來檢視當前Kafka中的Topic,若已成功啟動了config-server並配置正確,我們就可以在Kafka中看到已經多了一個名為springCloudBus
的Topic。
我們再啟動配置了spring-cloud-starter-bus-kafka
模組的config-client,可以看到控制檯中輸出如下內容:
2016-09-28 22:43:55.067 INFO 6136 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder : Using kafka topic for outbound: springCloudBus
2016-09-28 22:43:55.078 INFO 6136 --- [-localhost:2181] org.I0Itec.zkclient.ZkEventThread : Starting ZkClient event thread.
...
2016-09-28 22:50:38.584 INFO 828 --- [ main] o.s.i.kafka.support.ProducerFactoryBean : Using producer properties => {bootstrap.servers=localhost:9092, linger.ms=0, acks=1, compression.type=none, batch.size=16384}
2016-09-28 22:50:38.592 INFO 828 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
...
2016-09-28 22:50:38.615 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : Adding {message-handler:outbound.springCloudBus} as a subscriber to the 'springCloudBusOutput' channel
2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.integration.channel.DirectChannel : Channel 'didispace:7002.springCloudBusOutput' has 1 subscriber(s).
2016-09-28 22:50:38.616 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : started outbound.springCloudBus
...
2016-09-28 22:50:39.162 INFO 828 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframe[email protected]60cf855e
2016-09-28 22:50:39.162 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : Adding {message-handler:inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216} as a subscriber to the 'bridge.springCloudBus' channel
2016-09-28 22:50:39.163 INFO 828 --- [ main] o.s.c.s.b.k.KafkaMessageChannelBinder$7 : started inbound.springCloudBus.anonymous.f8fc9c0c-ccd3-46dd-9537-07198f4ee216
可以看到,config-client啟動時輸出了類似的內容,他們都訂閱了名為springCloudBus
的Topic。
在啟動了config-server和config-client之後,為了更明顯地觀察訊息匯流排重新整理配置的效果,我們可以在本地啟動多個不同埠的config-client。此時,我們的config-server以及多個config-client都已經連線到了由Kafka實現的訊息總線上。我們可以先訪問各個config-client上的/from
請求,檢視他獲取到的配置內容。然後,修改Git中對應的引數內容,再訪問各個config-client上的/from
請求,可以看到配置內容並沒有改變。最後,我們向config-server傳送POST請求:/bus/refresh
,此時我們再去訪問各個config-client上的/from
請求,就能獲得到最新的配置資訊,各客戶端上的配置都已經載入為最新的Git配置內容。
從config-client的控制檯中,我們可以看到如下內容:
2016-09-29 08:20:34.361 INFO 21256 --- [ kafka-binder-1] o.s.cloud.bus.event.RefreshListener : Received remote refresh request. Keys refreshed [from]
RefreshListener
監聽類記錄了收到遠端重新整理請求,並重新整理了from
屬性的日誌。
Kafka配置
在上面的例子中,由於Kafka、ZooKeeper均運行於本地,所以我們沒有在測試程式中通過配置資訊來指定Kafka和ZooKeeper的配置資訊,就完成了本地訊息匯流排的試驗。但是我們實際應用中,Kafka和ZooKeeper一般都會獨立部署,所以在應用中都需要來為Kafka和ZooKeeper配置一些連線資訊等。Kafka的整合與RabbitMQ不同,在Spring Boot 1.3.7中並沒有直接提供的Starter模組,而是採用了Spring Cloud Stream的Kafka模組,所以對於Kafka的配置均採用了spring.cloud.stream.kafka
的字首,比如:
屬性名 | 說明 | 預設值 |
---|---|---|
spring.cloud.stream.kafka.binder.brokers | Kafka的服務端列表 | localhost |
spring.cloud.stream.kafka.binder.defaultBrokerPort | Kafka服務端的預設埠,當brokers 屬性中沒有配置埠資訊時,就會使用這個預設埠 | 9092 |
spring.cloud.stream.kafka.binder.zkNodes | Kafka服務端連線的ZooKeeper節點列表 | localhost |
spring.cloud.stream.kafka.binder.defaultZkPort | ZooKeeper節點的預設埠,當zkNodes 屬性中沒有配置埠資訊時,就會使用這個預設埠 | 2181 |
更多配置引數請參考官方文件
本文完整示例:
相關推薦
Spring Cloud構建微服務架構: 訊息匯流排
Spring Cloud Bus除了支援RabbitMQ的自動化配置之外,還支援現在被廣泛應用的Kafka。在本文中,我們將搭建一個Kafka的本地環境,並通過它來嘗試使用Spring Cloud Bus對Kafka的支援,實現訊息匯流排的功能。由於本文會以之前Rabbit的實現作為基礎來修改,所以先閱讀《S
Spring Cloud構建微服務架構(七)訊息匯流排
先回顧一下,在之前的Spring Cloud Config的介紹中,我們還留了一個懸念:如何實現對配置資訊的實時更新。雖然,我們已經能夠通過/refresh介面和Git倉庫的Web Hook來實現Git倉庫中的內容修改觸發應用程式的屬性更新。但是,若所有觸發操作均需要我
Spring Cloud構建微服務架構(三)訊息匯流排
注:此文不適合0基礎學習者直接閱讀,請先完整的將作者關於微服務的博文全部閱讀一遍,如果還有疑問,可以再來閱讀此文,地址:http://blog.csdn.net/sosfnima/article/details/53178157,推薦讀者去找作者的書籍《Spring C
Spring-Boot:Spring Cloud構建微服務架構
xmlns art 超時 客戶 微服務架構 cover lns created 搭建 概述: 從上一篇博客《Spring-boot:5分鐘整合Dubbo構建分布式服務》 過度到Spring Cloud,我們將開始學習如何使用Spring Cloud 來搭建微服務。繼續采
Spring Cloud構建微服務架構分布式配置中心
post ast github 構造 clas mas files cli .class 在本文中,我們將學習如何構建一個基於Git存儲的分布式配置中心,並對客戶端進行改造,並讓其能夠從配置中心獲取配置信息並綁定到代碼中的整個過程。 準備配置倉庫 準備一個git倉庫,可
Spring Cloud構建微服務架構—創建“服務註冊中心”
springboot springcloud mybatis eureka config 創建一個基礎的Spring Boot工程,命名為eureka-server,並在pom.xml中引入需要的依賴內容: <parent> <groupId>org.springf
Spring Cloud構建微服務架構服務註冊與發現
springboot springcloud mybatis eureka config Spring Cloud簡介Spring Cloud是一個基於Spring Boot實現的雲應用開發工具,它為基於JVM的雲應用開發中涉及的配置管理、服務發現、斷路器、智能路由、微代理、控制總線、全局
Spring Cloud構建微服務架構-創建“服務提供方”
spring Spring Cloud Spring Boot config 下面我們創建提供服務的客戶端,並向服務註冊中心註冊自己。本文我們主要介紹服務的註冊與發現,所以我們不妨在服務提供方中嘗試著提供一個接口來獲取當前所有的服務信息。 首先,創建一個基本的Spring Boot應用。命名為
Spring Cloud構建微服務架構—服務網關過濾器
Spring Cloud Spring Boot mybatis 過濾器作用 我們的微服務應用提供的接口就可以通過統一的API網關入口被客戶端訪問到了。但是,每個客戶端用戶請求微服務應用提供的接口時,它們的訪問權限往往都需要有一定的限制,系統並不會將所有的微服務接口都對它們開放。然而,目前的服務路
Spring Cloud構建微服務架構Hystrix監控面板
Spring Cloud Spring Boot mybatis 在Spring Cloud中構建一個Hystrix Dashboard非常簡單,只需要下面四步: 創建一個標準的Spring Boot工程,命名為:hystrix-dashboard。編輯pom.xml,具體依賴內容如下: <
Spring Cloud構建微服務架構—服務消費(Ribbon)
ble DG 沒有 客戶 BE pla cati str 主類 Spring Cloud RibbonSpring Cloud Ribbon是基於Netflix Ribbon實現的一套客戶端負載均衡的工具。它是一個基於HTTP和TCP的客戶端負載均衡器。它可以通過在客戶端中
Spring Cloud構建微服務架構:服務消費(基礎)
消費 ring str frame emp default class a template pom.xml 使用LoadBalancerClient在Spring Cloud Commons中提供了大量的與服務治理相關的抽象接口,包括DiscoveryClient、這裏我
Spring Cloud構建微服務架構—Hystrix斷路器
能夠 電路 處理 觸發 就會 熔斷器 邏輯 響應 保險絲 斷路器模式源於Martin Fowler的Circuit Breaker一文。“斷路器”本身是一種開關裝置,用於在電路上保護線路過載,當線路中有電器發生短路時,“斷路器”能夠及時的切斷故障電路,防止發生過載、發熱、甚
Spring Cloud構建微服務架構-Hystrix服務降級
static 原因 架構 一個個 policy 消費者 兩個 comm 以及 在微服務架構中,我們將系統拆分成了一個個的服務單元,各單元應用間通過服務註冊與訂閱的方式互相依賴。由於每個單元都在不同的進程中運行,依賴通過遠程調用的方式執行,這樣就有可能因為網絡原因或是依賴服務
Spring Cloud構建微服務架構:服務註冊與發現 Eureka
Spring Cloud構建微服務架構:服務註冊與發現Eureka 【Dalston版】 原創 2018-04-10 宗野 Spring Cloud 已經
Spring Cloud構建微服務架構:服務容錯保護(Hystrix服務降級)
tro sco load 服務架構 延遲 正常 map ati href 動手試一試 在開始使用Spring Cloud Hystrix實現斷路器之前,我們先拿之前實現的一些內容作為基礎,其中包括: eureka-server工程:服務註冊中心,端口:1001 eurek
Spring Cloud構建微服務架構:服務消費(Ribbon)
Spring Cloud Ribbon Spring Cloud Ribbon是基於Netflix Ribbon實現的一套客戶端負載均衡的工具。它是一個基於HTTP和TCP的客戶端負載均衡器。它可以通過在客戶端中配置ribbonServerList來設定服務端列表去輪詢訪問以達到均衡負載的作用。 當Rib
Spring Cloud構建微服務架構:服務消費(Feign)
Spring Cloud Feign Spring Cloud Feign是一套基於Netflix Feign實現的宣告式服務呼叫客戶端。它使得編寫Web服務客戶端變得更加簡單。我們只需要通過建立介面並用註解來配置它既可完成對Web服務介面的繫結。它具備可插拔的註解支援,包括Feign註解、JAX-RS註解
Spring Cloud構建微服務架構:服務註冊與發現(Eureka、Consul)
Spring Cloud簡介 Spring Cloud是一個基於Spring Boot實現的雲應用開發工具,它為基於JVM的雲應用開發中涉及的配置管理、服務發現、斷路器、智慧路由、微代理、控制匯流排、全域性鎖、決策競選、分散式會話和叢集狀態管理等操作提供了一種簡單的開發方式。 Spring Cloud包含
Spring Cloud構建微服務架構—服務閘道器過濾器
過濾器作用 我們的微服務應用提供的介面就可以通過統一的API閘道器入口被客戶端訪問到了。但是,每個客戶端使用者請求微服務應用提供的介面時,它們的訪問許可權往往都需要有一定的限制,系統並不會將所有的微服務介面都對它們開放。然而,目前的服務路由並沒有限制許可權這樣的功能,所有請