1. 程式人生 > 程式設計 >kafka 上手指南:叢集版

kafka 上手指南:叢集版

大家好,我叫謝偉,是一名程式設計師。

今天的主題:kafka 上手指南--叢集版

前提回顧:kafka 單節點

1. 基本概念

在訊息系統中,涉及的概念都比較類似,初學訊息系統,概念有時候理解不到位,需要讀者反覆的根據自己的學習進度回過頭把基本概念捋清楚。

下面採用問答式陳述基本概念:

  1. 什麼是 broker ?

簡單的說,一個 kafka server 就是一個 broker。

  1. 什麼是 生產者 producer ?

簡單的說,提供訊息的系統稱為生產者

  1. 什麼是 消費者 consumer ?

簡單的說,對訊息進行處理的系統稱為消費者

  1. 什麼是 topic ?

簡單的說,區分訊息的不同型別,人為的起個名字,所以 topic 是個邏輯概念。

  1. 什麼是分割槽 partition ?

簡單的說,是儲存訊息的實體,即將 topic 劃分為不同的分割槽。物理層面看就是以 topic-N 命名的資料夾,資料夾下儲存訊息日誌。當然分割槽可以在同一個 broker 上,也可以在不同 broker 上,如果你用上了叢集版的 kafka。

topic-go-0
topic-go-1
topic-go-2
...
複製程式碼
  1. 什麼是 offset ?

簡單的說,是一個表示位移的數字。用來給消費者做標記的。比如給你發了100 條訊息,我怎麼知道你消費到了第幾個呢,offset 就是用來標記的。

  1. 什麼是 消費者組 ?

簡單的說,是一組消費者共同消費一個或者多個topic,當然某個消費者消費的是一個或者多個分割槽內的訊息。為什麼有消費者,又要消費者組?消費者消費訊息,需要訂閱某個 topic,消費者組共同消費一個或者多個 topic,這樣可以的效果是:可拓展、容錯。可拓展指,新加入一個消費者,可以承擔部分任務,減輕其他消費者負擔;同理,減少一個消費者,再重新給消費者分配訊息。這種分配機制,在 kafka 系統中稱之為:Rebalance,動態的調整。

那麼什麼時候會 Rebalance ?

  • 消費者數目的變化
  • topic 的變化
  • 分割槽的變化

其中消費者數目的變化,是最常見的場景。Rebalance 有利有弊,利:可拓展,容錯;弊:Rebalance 比較耗效能,某一個時刻會停止消費訊息。

  1. 什麼是 kafka 叢集?

簡單的說,叢集是一群服務的集合,一個典型的特徵是:多機器,多服務。這種特徵能夠保障系統的高可用,高併發。系統內部之間可以通過 zookeeper 、 Metadata 等發現彼此;對外,就像使用單服務一樣。

  1. “能力”的大小怎麼控制 ?

配置檔案,比如我怎麼保障生產者準確的傳送訊息呢,比如多個分割槽,我按什麼分割槽策略呢,比如生產者的訊息要不要壓縮,採用什麼壓縮方式;比如消費者是從最新的消費,還是最老的訊息消費;比如消費者組的 Rebalance 策略是什麼?

這些特性,我把它稱之為能力的大小,這些能力的大小,需要使用者足夠熟悉才能發揮其能力,或者說能具體問題具體分析。

  • broker “能力”的配置
  • 生產者“能力”的配置
  • 消費者“能力”的配置
  • 消費者組“能力”的配置

2. 配置

啟動服務時的配置檔案,這也是絕大多少服務啟動的一般方式,比如 MySQL 資料庫服務,比如 Redis 服務等,都是啟動時進行配置檔案,賦予其能力。

broker

# 目錄
config/server.properties
複製程式碼
  • log.dirs 訊息儲存目錄,可以多個
log.dirs=/kafka/kafka-logs-kfk1
複製程式碼
  • zookeeper.connect ,可以多個,用於叢集方式
zookeeper.connect=zookeeper-1:2181
複製程式碼
  • advertised.listeners 對外地址
advertised.listeners=PLAINTEXT://kfk1:9092
複製程式碼
  • listener.security.protocol.map 安全協議
listener.security.protocol.map=CONTROLLER:PLAINTEXT
複製程式碼

一般這些配置就可以,其他預設,其中 log.dirs,zookeeper.connect 最為重要

topic

  • auto.create.topics.enable 是否允許自動建立 topic
auto.create.topics.enable=false
複製程式碼

啟動服務之後,一般通過客戶端工具,編寫程式碼完成相應的設定。

就 go 中,kafka 客戶端使用:sarama

type config struct {
    Producer struct {
        ...
    }
    Consumer struct {
        ...
        Group struct {
        ...
        }
    }
}
複製程式碼
  • 配置針對消費者,配置config.Consumer
  • 配置針對生產者,配置config.Producer
  • 配置針對消費者組,配置config.Consumer.Group

消費者:

	c.Consumer.Fetch.Min = 1
	c.Consumer.Fetch.Default = 1024 * 1024
	c.Consumer.Retry.Backoff = 2 * time.Second
	c.Consumer.MaxWaitTime = 250 * time.Millisecond
	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
	c.Consumer.Return.Errors = false
	c.Consumer.Offsets.CommitInterval = 1 * time.Second
	c.Consumer.Offsets.Initial = OffsetNewest
	c.Consumer.Offsets.Retry.Max = 3
複製程式碼

其中,一般預設,否則配置:

  • 是否返回錯誤:c.Consumer.Return.Errors
  • 消費起始值:c.Consumer.Offsets.Initial
  • 重試機制:Retry

生產者:

    //  訊息的最大值大概 1MB
	c.Producer.MaxMessageBytes = 1000000
    // 訊息是否應答:0: 不應答,禁用;1: leader 收到即可 ; -1: 所有的副本都收到
	c.Producer.RequiredAcks = WaitForLocal
    
	c.Producer.Timeout = 10 * time.Second
    
    // 分割槽策略:隨機、輪詢、hash 等
	c.Producer.Partitioner = NewHashPartitioner
    // 重試機制
	c.Producer.Retry.Max = 3
	c.Producer.Retry.Backoff = 100 * time.Millisecond
	c.Producer.Return.Errors = true
    
    // 壓縮演演算法:gzip,zstd,lz4,snappy
	c.Producer.CompressionLevel = CompressionLevelDefault
複製程式碼

消費者組:

    // 間隔
	c.Consumer.Group.Session.Timeout = 10 * time.Second
    
    // 心跳
	c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
    
    // Rebalance 策略
	c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
	c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
	c.Consumer.Group.Rebalance.Retry.Max = 4
	c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
複製程式碼

3. 消費者組

普通的消費者,一般需要指定 topic,offset 指定消費:

比如:

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	brokers := []string{"127.0.0.1:9092"}
	master,err := sarama.NewConsumer(brokers,config)
    consumer,err := master.ConsumePartition("topic-python",0,sarama.OffsetNewest)

複製程式碼

其中:

ConsumePartition(topic string,partition int32,offset int64) (PartitionConsumer,error)
複製程式碼
  • topic
  • partition
  • offset

但一般這種形式,需要指定 offset 這種,不方便使用。所以一般使用消費者組的形式。

type KafkaConsumerGroupAction struct {
	group sarama.ConsumerGroup
}

func NewKafkaConsumerGroupAction(brokers []string,groupId string) *KafkaConsumerGroupAction {
	config := sarama.NewConfig()
	sarama.Logger = log.New(os.Stdout,"[consumer_group]",log.Lshortfile)
	// 重平衡策略
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	config.Consumer.Group.Session.Timeout = 20 * time.Second
	config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
	config.Consumer.IsolationLevel = sarama.ReadCommitted
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V2_3_0_0
	consumerGroup,e := sarama.NewConsumerGroup(brokers,groupId,config)
	if e != nil {
		log.Println(e)
		return nil
	}
	return &KafkaConsumerGroupAction{group: consumerGroup}

}

func (K *KafkaConsumerGroupAction) Consume(topics []string,wg sync.WaitGroup,ctx context.Context) {
	var consumer = KafkaConsumerGroupHandler{ready: make(chan bool)}
	go func() {
		defer wg.Done()
		for {
			if err := K.group.Consume(ctx,topics,&consumer); err != nil {
				log.Panicf("Error from consumer: %v",err)
			}
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()
	<-consumer.ready
	log.Println("Sarama consumer up and running!...")
	sigterm := make(chan os.Signal,1)
	signal.Notify(sigterm,syscall.SIGINT,syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	wg.Wait()
	if err := K.group.Close(); err != nil {
		log.Panicf("Error closing client: %v",err)
	}
}

type KafkaConsumerGroupHandler struct {
	ready chan bool
}

func (K *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (K *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}
func (K *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s,timestamp = %v,topic = %s,partions = %d,offset = %d",string(message.Value),message.Timestamp,message.Topic,message.Partition,message.Offset)
		lag := claim.HighWaterMarkOffset() - message.Offset
		fmt.Println(lag)
		session.MarkMessage(message,"")
	}

	return nil
}
複製程式碼

消費者組:

type ConsumerGroup interface {
	Consume(ctx context.Context,topics []string,handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}
複製程式碼

其中:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error

	Cleanup(ConsumerGroupSession) error

	ConsumeClaim(ConsumerGroupSession,ConsumerGroupClaim) error
}
複製程式碼

真實的訊息處理,需要實現 ConsumerGroupHandler 介面。

4. 生產者的一般處理流程

如果這些概念你都清楚,那麼整體來說,使用 kafka 的難點在哪呢?

  • 如何確保訊息準確無誤地傳送
  • 如何確保不重複消費訊息
  • 如何確保訊息不滯後,最好是生產者發往訊息系統,消費者立馬消費掉,沒有延長
  • 如何確保系統高可用
  1. 生產者配置
  2. 例項化生產者
  3. 構建訊息
  4. 傳送訊息
  5. 關閉生產者例項
func NewAsyncProducer(addrs []string,conf *Config) (AsyncProducer,error) {
	client,err := NewClient(addrs,conf)
	if err != nil {
		return nil,err
	}
	return newAsyncProducer(client)
}
複製程式碼
//非同步生產者
type AsyncProducer interface {

	AsyncClose()
	Close() error
	Input() chan<- *ProducerMessage // 傳送訊息
	Successes() <-chan *ProducerMessage
	Errors() <-chan *ProducerError
}
複製程式碼

5. 消費者的一般處理流程

消費者的一般處理流程:

  1. 消費者配置
  2. 例項化消費者
  3. 訂閱主題
  4. 提交位移
  5. 關閉消費者
func NewConsumer(addrs []string,config *Config) (Consumer,config)
	if err != nil {
		return nil,err
	}
	return newConsumer(client)
}
複製程式碼
type Consumer interface {

	Topics() ([]string,error) // 訊息
	Partitions(topic string) ([]int32,error) // 分割槽
	ConsumePartition(topic string,partition int32,offset int64) (PartitionConsumer,error) // 消費訊息
	HighWaterMarks() map[string]map[int32]int64 // 高水位

	Close() error
}
複製程式碼

6. 消費者組的一般處理流程

普通的消費者,需要指定分割槽和位移,進行消費,不常用。一般選擇消費者組。

那麼消費者組一般的處理流程是?

  1. 配置消費者組
  2. 例項話消費者組,指定 topic,指定消費者組 GroupID
  3. 消費訊息
  4. 關閉消費者組
type ConsumerGroup interface {
	Consume(ctx context.Context,handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}
複製程式碼

消費者組處理器:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error
	Cleanup(ConsumerGroupSession) error
	ConsumeClaim(ConsumerGroupSession,ConsumerGroupClaim) error
}
複製程式碼

7. 叢集

上文說到,叢集一個特徵是:多機器,多服務。

真實的線上環境,zookeeper 部署在不同機器,kafka server 部署在不同機器,組成的系統,共同服務於線上系統。

個人學習,為了達到叢集的效果,即:使用不同的埠區分即可。

當然你可以本地配置 zookeeper, kafka。但我一般喜歡用容器的方式,部署起來方便。

  • 多節點 zookeeper
  zookeeper-1:
    image: zookeeper
    restart: always
    hostname: zookeeper-1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper1/data:/data
      - /local/volumn/zookeeper1/datalog:/datalog
  zookeeper-2:
    image: zookeeper
    restart: always
    hostname: zookeeper-2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper2/data:/data
      - /local/volumn/zookeeper2/datalog:/datalog
  zookeeper-3:
    image: zookeeper
    restart: always
    hostname: zookeeper-3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper3/data:/data
      - /local/volumn/zookeeper3/datalog:/datalog
複製程式碼

其中最重要的是環境變數:

ZOO_MY_ID 一般用一個數字表示 myid
複製程式碼
ZOO_SERVERS
複製程式碼

抽象出一個公式:server.A=B:C:D

  • A 表示 myid,表示伺服器的編號
  • B 表示代表伺服器的 ip 地址
  • C 表示伺服器與叢集中的 leader 伺服器交換資訊的埠
  • D 表示選舉時伺服器相互通訊的埠

有人會說,我不知道這些環境變數怎麼辦,我也不知道具體的環境變數名呢?

看 Docker hub 上的具體檔案啊:

zookeeper docker hub 檔案:hub.docker.com/_/zookeeper

  kfk1:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk1
    hostname: kfk1
    restart: always
    ports:
      - 9092:9092
      - 19999:9999
    expose:
      - 19092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk1:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk1:/kafka/kafka-logs-kfk1
  kfk2:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk2
    hostname: kfk2
    restart: always
    ports:
      - 29092:29092
      - 29999:9999
    expose:
      - 29092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk2:29092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk2:/kafka/kafka-logs-kfk2
  kfk3:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk3
    hostname: kfk3
    restart: always
    ports:
      - 39092:39092
      - 39999:9999
    expose:
      - 39092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk3:39092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk3:/kafka/kafka-logs-kfk3
複製程式碼

其中最重要的是以下幾個環境變數:

KAFKA_BROKER_ID  broker.id 單節點時,預設值為-1
複製程式碼
KAFKA_ZOOKEEPER_CONNECT kafka zookeeper 連線地址,對應上文 zookeeper 對外地址
複製程式碼
KAFKA_ADVERTISED_LISTENERS 該節點對外公佈的訪問地址和埠
複製程式碼
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092 該節點對外訪問地址和埠
複製程式碼
  ui:
    image: index.docker.io/sheepkiller/kafka-manager:latest
    restart: always
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kfk1
      - kfk2
      - kfk3
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper-1:2181,zookeeper-3:2181
      KAFKA_BROKERS: kfk1:19092,kfk2:29092,kfk3:39092
複製程式碼

其中環境變數:

ZK_HOSTS zookeeper 節點地址
複製程式碼
KAFKA_BROKERS kafa 節點地址
複製程式碼

啟動:

docker-compose -f docker-compose.yml up -d
複製程式碼

叢集版本的kafka 服務,基本上和單節點的 kafka 服務使用方式一致,叢集版本的系統更穩健,高可用,比如冗餘備份,一旦一個節點失效了並不影響服務,除非全部節點失效。

  • 備份:

建立 topic,備份的數目小於等於 kafka 節點數目。比如三個節點,備份2份,可能在 三個節點上任意兩個。

  • 分割槽

單節點,topic 的分割槽,都在同一個資料夾下;叢集版本,分割槽的大致可以均勻的分佈在叢集節點上

對外服務,和單節點完全一致。

topic-go.png

topic-go 10個分割槽,備份 2 份:三個節點分別儲存:6,7,7 個分割槽

broker.png

叢集版本可能會出現的問題?

  • 設定過 不自動建立 topic,記得先手動建立 topic

  • 叢集訪問地址不通。1 設定 /etc/hosts ;2 開放埠,特別是雲伺服器,記得開放埠

  • 消費滯後 Lag, 怎麼辦?增加消費者例項

參考:

程式碼地址: