1. 程式人生 > >Kafka叢集生產/消費的負載均衡(Rebalance)測試

Kafka叢集生產/消費的負載均衡(Rebalance)測試

基於Kafka客戶端的高階API,配合zookeeper的使用,可以有效的實現Kafka叢集的Rebalance,提高生產環境下的健壯性。本文使用librdkafka(https://github.com/edenhill/librdkafka) 提供的高階API來實現生產/消費,作為測試的基礎。

生產者的負載均衡
對於同一個Topic的不同Partition,Kafka會盡力將這些Partition分佈到不同的Broker伺服器上,這種均衡策略實際上是基於Zookeeper實現的。在一個Broker啟動時,會首先完成Broker的註冊過程,並註冊一些諸如“有哪些可訂閱的Topic”之類的元資料資訊。生產者啟動後也要到zookeeper下注冊,建立一個臨時節點來監聽Broker伺服器列表的變化。由於在Zookeeper下Broker建立的也是臨時節點,當Brokers發生變化時,生成者可以得到相關的通知,從改變自己的Broker list。其他的諸如Topic的變化以及Broker和Topic的關係變化,也是通過Zookeeper的這種Watcher監聽實現的。
在生產中,必須指定topic;但是對於partition,有兩種指定方式:

  1. 明確指定partition(0-N),則資料被髮送到指定partition
  2. 設定為RD_KAFKA_PARTITION_UA,則kafka會回撥partitioner進行均衡選取,partitioner方法需要自己實現。可以輪詢或者傳入key進行hash。未實現則採用預設的隨機方法rd_kafka_msg_partitioner_random隨機選擇。

消費者的負載均衡
Kafka具有消費分組的概念,某個Topic的某個partition只能由一個Consumer group中的一個Consmer消費。但如果兩個Consmer不在同一個Consumer group,那麼他們是可以同時消費某Topic的同一個partition的。
對於某些低級別的API,Consumer消費時必須制定topic和partition,這顯然不是一種很好的均衡策略。基於高級別的API,Consumer消費時只需制定topic,藉助zookeeper可以根據partition的數量和consumer的數量做到均衡的動態配置。
消費者在啟動時會到zookeeper下以自己的conusmer-id建立臨時節點/consumer/[group-id]/ids/[conusmer-id],並對/consumer/[group-id]/ids註冊監聽事件,當消費者發生變化時,同一group的其餘消費者會得到通知。當然,消費者還要監聽broker列表的變化。librdkafka通常會將partition進行排序後,根據消費者列表,進行輪流的分配。消費者負載均衡中涉及的更重要的一點還有如何動態的維護partition的offset,因為消費者可能是變化的,而offset需要由comsumer來維護,librdkafka是藉助zookeeper和消費者佇列實現的,具體可以檢視librdkafka的實現原始碼。(ps:

Kafka已推薦將consumer的位移資訊儲存在Kafka內部的topic中,即__consumer_offsets(/brokers/topics/__consumer_offsets),並且預設提供了kafka_consumer_groups.sh指令碼供使用者檢視consumer資訊(sh kafka-consumer-groups.sh –bootstrap-server * –describe –group *)。在當前版本中,offset儲存方式要麼儲存在本地檔案中,要麼儲存在broker端, 具體的儲存方式取決”offset.store.method”的配置,預設是儲存在broker端)

Rebalance測試
下面基於librdKafka實現的C/C++客戶端(0.9版本及以上),
測試Kafka叢集的負載均衡。
測試環境及方法:

  • 三個broker通過zookeeper組成的叢集;
  • 每個Topic預設三個分割槽,每個分割槽預設只有一個副本;
  • 通過增減生產者、消費者、broker來觀察生產/消費的變化

1.配置kafka叢集
由於條件有限,在同一個機器上啟動三個broker來模擬kafka叢集,三個broker使用另外安裝的同一個zookeeper服務(實際叢集中,每個broker通常在不同的機器上,也會使用不同host的zookeeper)
zookeeper的安裝和啟動並不複雜,在此略過。

cd /home/kafka/config
//準備三份用於啟動kafka服務的配置
cp server.properties server-0.properties
cp server.properties server-1.properties
cp server.properties server-2.properties
三份配置中都要修改以下
broker.id=0(三個配置中分別修改為0,1,2)
port=9092(三個配置中分別修改為9092,9093,9094)
log.dirs=/tmp/kafka-logs-0(三個配置中分別修改為/tmp/kafka-logs-0,/tmp/kafka-logs-1,/tmp/kafka-logs-2)
num.partitions=3 (都設定為3,即每個topic預設三個partition)

2.測試
生產消費的負載均衡
啟動三個broker:修改完配置後,開啟三個shell視窗,分別啟動三個broker
kafka/bin/kafka-server-start.sh server-X.properties
啟動日誌

...
[2017-02-19 20:38:39,766] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:38:39,766] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:38:39,775] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

...
[2017-02-19 20:39:12,628] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:39:12,629] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:39:12,632] INFO [Kafka Server 1], started (kafka.server.KafkaServer)

...
[2017-02-19 20:42:18,809] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:42:18,809] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-19 20:42:18,812] INFO [Kafka Server 2], started (kafka.server.KafkaServer)

啟動兩個producer,建立名為xyz的topic,根據配置,該topic預設有3個partition:

[root@localhost producer]# ./producer 
Current RdKafka-ver:0.9.4-pre1
% Created producer rdkafka#producer-1
kafka的broker的日誌變化分別是
//broker-0,名為xyz的topic的partition-2被分別到broker-0
[2017-02-19 20:45:17,268] INFO Partition [xyz,2] on broker 0: No checkpointed highwatermark is found for partition [xyz,2] (kafka.cluster.Partition)
//broker-1,名為xyz的topic的partition-0被分別到broker-1
[2017-02-19 20:45:17,325] INFO Partition [xyz,0] on broker 1: No checkpointed highwatermark is found for partition [xyz,0] (kafka.cluster.Partition)
broker-2,名為xyz的topic的partition-1被分別到broker-2
[2017-02-19 20:45:17,001] INFO Partition [xyz,1] on broker 2: No checkpointed highwatermark is found for partition [xyz,1] (kafka.cluster.Partition)
在/tmp/kafka-logs-[*]的變化也可以印證這一點

依次啟動三個consumer:

//啟動1臺時,該消費者負責消費所有的3個partition
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[0], xyz[1], xyz[2],

//啟動2臺時,會觸發消費者的負載均衡
//consumer-1
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[0], xyz[1], xyz[2], 
RebalanceCb: Local: Revoke partitions: xyz[0], xyz[1], xyz[2], 
RebalanceCb: Local: Assign partitions: xyz[0], xyz[1],

//consumer-2
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[2],

//啟動3臺時,會再次觸發消費者的負載均衡
//consumer-1
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[0], xyz[1], xyz[2], 
RebalanceCb: Local: Revoke partitions: xyz[0], xyz[1], xyz[2], 
RebalanceCb: Local: Assign partitions: xyz[0], xyz[1], 
RebalanceCb: Local: Revoke partitions: xyz[0], xyz[1], 
RebalanceCb: Local: Assign partitions: xyz[0],

//consumer-2
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[2], 
RebalanceCb: Local: Revoke partitions: xyz[2], 
RebalanceCb: Local: Assign partitions: xyz[1],

//consumer-3
[[email protected] consumer]# ./consumer 
Current RdKafka-ver:0.9.4-pre1
RebalanceCb: Local: Assign partitions: xyz[2],

當依次關閉consumer時,同樣會觸發類似的rebalance,再次不一一演示
使用兩個producer輪流向topic-xyz傳送訊息,三個消費者會大致消費相同的訊息數目

broker的均衡
如果此時關掉一個broker,那麼所以的消費者和生產者客戶端都會收到如下的提示

2017-02-19 21:06:02.206: LOG-3-FAIL: [thrd:localhost:9094/bootstrap]: localhost:9094/bootstrap: Connect to ipv4#127.0.0.1:9094 failed: Connection refused
2017-02-19 21:06:02.206: ERROR (Local: Broker transport failure): localhost:9094/bootstrap: Connect to ipv4#127.0.0.1:9094 failed: Connection refused
在此之後,producer傳送的訊息將不會被髮送host為localhost:9094的broker;如果該broker又恢復了,那麼producer和consumer都會在此連線上該服務,因為他們都聽過zookeeper監聽著broker的變化

PS:
a.在本文的情況下(一個topic只有三個partition),如果啟動4個consumer了?
由於只有3個partition,那麼最後會有一個consumer無法消費

b.Rebalance後的消費者的從哪兒開始消費了?
offset由客戶端和zookeeper通過consumer佇列維護全域性的變化

相關推薦

Kafka叢集生產/消費負載均衡(Rebalance)測試

基於Kafka客戶端的高階API,配合zookeeper的使用,可以有效的實現Kafka叢集的Rebalance,提高生產環境下的健壯性。本文使用librdkafka(https://github.com/edenhill/librdkafka) 提供的高階AP

lvs+keepalived+nginx負載均衡搭建測試

lvs keepalived nginx centos7 ipvsadm 1. 簡介1.1 LVS簡介 LVS(Linux Virtual Server),也就是Linux虛擬服務器, 是一個由章文嵩博士發起的自由軟件項目。使用LVS技術要達到的目標是:通過LVS提供的負載均衡技

zuul叢集及頂層負載均衡

上一篇博文我們引入了zuul路由閘道器實現了對不同生產者服務叢集的路由轉發。不過真正系統釋出時有以下問題: 1.zuul裡的配置檔案會暴露我們所有介面的資訊,放在頂層不合適 2.畢竟落實了路由分發和過濾器功能,當介面很多的時候也是可能宕機的 其他考慮應該還有很多,我只是簡單理解了一下。下

springCloud(F版)(4)——zuul叢集及頂層負載均衡

上一篇博文我們引入了zuul路由閘道器實現了對不同生產者服務叢集的路由轉發。不過真正系統釋出時有以下問題: 1.zuul裡的配置檔案會暴露我們所有介面的資訊,放在頂層不合適 2.畢竟落實了路由分發和過濾器功能,當介面很多的時候也是可能宕機的 其他考慮應該還有很多,我只是簡單理解了一下

nginx實現負載均衡簡單測試

環境 測試域名  :a.com A伺服器IP :192.168.0.130 (主) B伺服器IP :192.168.0.131 C伺服器IP :192.168.0.131 部署思路 A伺服器做為主伺服器,域名直接解析到A伺服器(192.168.0.130)上,由A

RabbitMQ 單機部署 && 叢集部署 && HAProxy 負載均衡搭建

準備工作 搭建 RabbitMQ Server 單機版 RabbitMQ Server 高可用叢集相關概念 搭建 RabbitMQ Server 高可用叢集 搭建 HAProxy 負載均衡 一、準備工作(centos7.2 1511) 1.node1:172.16.25

SpringCloud入門(eureka叢集和feign負載均衡

什麼是springcloud? 這是spring官網對springcloud的介紹,大致意思就是:讓分散式系統簡單化。springcloud是建立在springboot之上的,也就是說他是需要依賴springboot的,因此學習springcloud首先就要了解spri

Dubbo之旅--叢集容錯和負載均衡

當我們的系統中用到Dubbo的叢集環境,因為各種原因在叢集呼叫失敗時,Dubbo提供了多種容錯方案,預設為failover重試。        Dubbo的叢集容錯在這裡想說說他是因為我們實際的專案中出現了此類的問題,因為依賴的第三方專案出現異常,導致dubbo呼叫超時,此時使用的是預設的叢集容錯方式

編寫Java程式向Kafka叢集生產並消費資料

一.Kafka生產資料 1.預備知識: 1.程式設計環境如下: 01.使用windows的intellij編寫java程式,連線到本地虛擬機器上的kafka叢集,生產和消費資料。 02.一定要注意配置等問題,否則會導致無法連線到zookeeper和

springcloud-eureka叢集-自定義負載均衡規則

1、首先在服務呼叫者專案中實現IRule介面,用隨機數控制呼叫服務的埠 importjava.util.List; import java.util.Random; import com.netflix.loadbalancer.ILoadBalancer; import

DUBBO叢集容錯與負載均衡

dubbo有良好的叢集方案以及負載策略。仔細想了一下,叢集容錯與負載均衡還是要總結在一起,畢竟負載均衡是基於叢集容錯的。 dubbo叢集容錯 在叢集呼叫失敗時,dubbo提供了多種容錯方案,預設方案為failover。dubbo自帶的叢集方案有六種,基本

虛擬伺服器叢集的ip負載均衡技術理解

ip負載均衡技術是在負載排程器的實現技術中時最高效的,在已有的ip負載均衡技術中主要有通過網路地址轉換,將一組伺服器構建成一個高效能的,高可用的虛擬伺服器我們稱之為VS/NAT技術。 在分析VS/NAT的缺點和網路服務的非對稱性的基礎上,我們提出了VS/TUN

Docker swarm搭建叢集以及實現負載均衡

Docker swarm Swarm 是 Docker 公司在 2014 年 12 月初發布的一套較為簡單的工具,用來管理 Docker 叢集,它將一群 Docker 宿主機變成一個單一的,虛擬的主機。Swarm 使用標準的 Docker API介面作為

nginx實現叢集伺服器的負載均衡

nginx 是一個很強大的高效能Web和反向代理伺服器。這裡主要使用的就是nginx的反向代理功能。 反向代理(Reverse Proxy)方式是指以代理伺服器來接受internet上的連線請求,然後將請求轉發給內部網路上的伺服器,並將從伺服器上得到

Postgres-xl叢集部署 + Haproxy 負載均衡

Postgres-xl 簡介 Postgres的-XL是一個基於PostgreSQL資料庫的橫向擴充套件開源SQL資料庫叢集,具有足夠的靈活性來處理不同的資料庫工作負載: 完全ACID,保持事務一致性 OLTP 寫頻繁的業務 需要MPP並行性商業智慧/大資

dubbo工作原理,叢集容錯,負載均衡

dubbo主要核心部件Remoting:網路通訊框架,實現了sync-over-async和request-response訊息機制。RPC:一個遠端過程呼叫的抽象,支援負載均衡、容災和叢集功能。Registry:服務目錄框架用於服務的註冊和服務事件釋出和訂閱。(類似第一篇文

nginx負載均衡簡單測試配置

負載均衡 先來簡單瞭解一下什麼是負載均衡,單從字面上的意思來理解就可以解釋N臺伺服器平均分擔負載,不會因為某臺伺服器負載高宕機而某臺伺服器閒置的情況。那麼負載均衡的前提就是要有多臺伺服器才能實現,也就是兩臺以上即可。 測試環境 由於沒有伺服器,所以本次測試直接host

Keepalived + Nginx 實現高可用(雙機熱備) Web 負載均衡 安裝測試筆記

system ati 根據 properly 節點配置 sys .gz ive error: keepalived是什麽 keepalived是集群管理中保證集群高可用的一個服務軟件,其功能類似於heartbeat,用來防止單點故障。 keepalived工作原理 k

kafka測試同一個消費組的多個消費者負載均衡例項(整合spring)

這裡使用的是zookeeper和kafka3臺機器的叢集,這樣能保證如過有一臺機器炸了還能執行,在叢集環境中,要在kafka的 server.properties中配置zookeeper叢集地址等資訊,最重要的是num.partitions=3.這樣一個分割槽就是一個機器,所以當kaf

Kafka的單執行緒生產消費測試

程式碼: package com.weichai.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import