Kafka體系架構、命令、Go案例
阿新 • • 發佈:2020-12-31
原文地址:https://github.com/WilburXu/blog/blob/master/kafka/Kafka基本架構和命令.md
## Kafka體系架構
![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232502355-1349949813.png)
### Broker服務代理節點
服務代理節點。對於Kafka而言,Broker可以簡單地看作一個獨立的Kafka服務節點或Kafka服務例項。大多數情況下也可以將Broker看作一臺Kafka伺服器,前提是這臺伺服器上只部署了一個Kafka例項,一個或多個Broker組成了一個Kafka叢集。
### Producer和Consumer
![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232515385-1099033831.png)
#### Producer生產者
生產者,也就是傳送訊息的一方。生產者負責建立訊息,然後將其投遞到Kafka中。
一個正常的生產邏輯需要具備以下幾個步驟:
1. 建立生產者例項
2. 構建待發送的訊息
3. 傳送訊息到指定的`Topic`、`Partition`、`Key`
4. 關閉生產者例項
#### Consumer消費者
消費者,也就是接收訊息的一方。消費者連線到Kafka上並接收訊息,從而進行相應的業務邏輯處理。
消費一般有三種消費模式:
##### 單執行緒模式
![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232526167-1110298177.png)
單個執行緒消費多個`Partition`
問題:
- 效率低,併發上不去
- 可用性差,單個執行緒掛了,將無法消費
#### 多執行緒模式
##### 獨立消費者模式
![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232535260-8525770.png)
和單執行緒模式類似,區別就是為每一個`Partition`單獨起一個執行緒進行消費。
問題:
- 執行緒和併發增加了,但是單執行緒掛了,該執行緒的分割槽還是無法消費。
##### 消費組模式
![](https://img2020.cnblogs.com/blog/1062001/202012/1062001-20201230232544259-2054852396.png)
也是目前最常用的消費模式,我們可以建立多個消費例項並設定同一個`group-id`來區分消費組,同一個消費組可以指定一個或多個`Topic`進行消費:
- 消費組自平衡(Rebalance),kafka會根據消費組例項數量和分割槽數量自平衡分配
- 不會重複消費,同個組內kafka確保一個分割槽只會發往一個消費例項,避免重複消費
- 高可用,當一個消費例項掛了,kafka會自動調整消費例項和分割槽的關係
### Topic主題
Kafka中的訊息以主題為單位進行歸類(邏輯概念,生產者負責將訊息傳送到特定的主題(傳送到Kafka叢集中的每一條訊息都要指定一個主題),而消費者負責訂閱主題並進行消費。
### Partition分割槽
物理分割槽,主題細分為了1或多個分割槽,一個分割槽只能屬於單個主題,一般也會把分割槽稱為主題分割槽(Topic-Partition)。
### Segment
實際儲存資料的地方,`Segment`包含一個數據檔案和一個索引檔案。一個`Partition`有多個大小相同的`Segment`,可以理解為`Partition`是在`Segment`之上進行的邏輯抽象。
## Kafka基本命令
### zookeeper
broker節點儲存在zookeeper,所有需要:
1. 進入zookeeper,然後 `./bin/zkCli.sh`
2. 執行`ls /brokers/ids`
#### 檢視broker詳情
`kafka-log-dirs.sh --describe --bootstrap-server kafka:9092 --broker-list 1`
### topic
#### 檢視列表
`kafka-topics.sh --list --zookeeper zookeeper:2181`
#### 建立
` kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic [topic_name]`
#### 檢視詳情
` kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic [topic_name]`
#### 刪除
`kafka-topics.sh --zookeeper zookeeper:2181 --delete --topic [topic_name]`
#### topic消費情況
##### topic offset 最小
`kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -2`
##### topic offset最大
`kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic [topic_name] --time -1`
### 生產
##### 新增資料
`kafka-console-producer.sh --broker-list localhost:9092 --topic [topic_name]`
### 消費
##### 從頭部開始消費
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --from-beginning`
##### 從尾部開始消費,必需要指定分割槽
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0`
##### 從某個位置開始消費(--offset [n])
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset 100 --partition 0`
##### 消費指定個數(--max-messages [n])
`kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [topic_name] --offset latest --partition 0 --max-messages 2`
### 消費組
##### 檢視消費組列表
`kafka-consumer-groups.sh --list --bootstrap-server localhost:9092`
##### 檢視消費組情況
`kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group [group_id]`
##### offset 偏移設定為最早
`kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-earliest --all-topics --execute`
##### offset 偏移設定為新
`kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-latest --all-topics --execute`
##### offset 偏移設定為指定位置
`kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-offset 2000 --all-topics --execute`
##### offset 偏移設定某個時間之後最早位移
`kafka-consumer-groups.bat --bootstrap-server kafka:9092 --group kafka_consumer_session --reset-offsets --to-datetime 2020-12-28T00:00:00.000 --all-topics --execute`
## Go案例
基於`https://github.com/Shopify/sarama`的生產和消費案例
###生產者
`InitKafka.go`
```go
package kafka
var (
kafkaClient *Client
)
func InitKafka() {
var err error
var config = Config{
Host: []string{"kafka:9092"},
}
kafkaClient, err = NewClient(config)
if err != nil {
panic(err)
}
}
func GetClient() *Client {
return kafkaClient
}
```
`Producer.go`
```go
package kafka
import (
"errors"
"github.com/Shopify/sarama"
)
type Client struct {
sarama.AsyncProducer
msgPool chan *sarama.ProducerMessage
}
type Config struct {
Host []string `json:"host"`
ReturnSuccess bool `json:"return_success"`
ReturnErrors bool `json:"return_errors"`
}
func NewClient(cfg Config) (*Client, error) {
// create client
var err error
c := &Client{
msgPool: make(chan *sarama.ProducerMessage, 2000),
}
config := sarama.NewConfig()
config.Producer.Return.Errors = cfg.ReturnErrors
config.Producer.Return.Successes = cfg.ReturnSuccess
config.Version = sarama.V2_0_0_0
c.AsyncProducer, err = sarama.NewAsyncProducer(cfg.Host, config)
if err != nil {
return nil, err
}
return c, nil
}
// run
func (c *Client) Run() {
for {
select {
case msg := <-c.msgPool:
c.Input() <- msg
logger.Info("%+v", msg)
}
}
}
// send msg
func (c *Client) Send(topic string, msg []byte) error {
if topic == "" {
return errors.New("kafka producer send msg topic empty")
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(msg),
}
c.msgPool <- kafkaMsg
return nil
}
```
**生產者初始化**:
```go
// kafka init
kafka.InitKafka()
go kafka.GetClient().Run()
```
### 消費者
consumer.go
```go
package kafka_consumer
import (
"context"
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"syscall"
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error {
//panic("implement me")
return nil
}
func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
//panic("implement me")
return nil
}
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
logger.Info("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
c.Handler(message.Topic, message.Value)
}
return nil
}
func (c *Consumer) Handler(topic string, msg []byte) {
switch topic {
case conscom.KafkaTopicGiftOrder:
GiftOrder(topic, msg)
case conscom.KafkaTopicFollow:
UserFollow(topic, msg)
}
}
func ConsumeInit(topics []string, groupID string) {
consumer := Consumer{
ready: make(chan bool),
}
brokerList := []string{"kafka:9092"}
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokerList, groupID, config)
if err != nil {
log.Printf("kafka consumer err %v", err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// server-side rebalance happens, the consumer session will need to be
if err := client.Consume(ctx, topics, &consumer); err != nil {
log.Printf("kafka consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Printf("kafka consume gift terminating: context cancelled")
case <-sigterm:
log.Printf("kafka consume gift terminating: via signal")
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Printf("kafka consume gift Error closing client: %v", err)
}
}
```
**消費者初始化**:
```go
// kafka consumer
go kafka_consumer.ConsumeInit([]string{"topicA", "topicB", "group-name")
```
## 參考
《深入理解Kafka:核心設計與實踐原理》作者:朱忠華
https://github.com/Shopify/sarama
http://kafka.apache.org/documentation/
https://crossoverjie.top/2018/11/20/kafka/kafka-c