用go實現的kafka客戶端,基於sarama和sarama-cluster
工作中需要將原先的訊息佇列替換成kafka,於是接觸了基於go實現的sarama,又因為sarama不支援consumer group,於是又使用了sarama cluster,同時又希望儘量保證消費一次的語義,說到這個exactly once,sarama從去年就立了issue要支援exactly once,結果到現在還沒支援(https://github.com/Shopify/sarama/issues/901)。
於是就自己造了個簡單的輪子,把sarama和sarama cluster封裝到一起,同時實現了保證消費一次的語義,我給它起名為kago。
先附上kago的依賴,需要先進行安裝:
go get github.com/Shopify/sarama
go get github.com/bsm/sarama-cluster
然後便可以安裝kago:
go get go get github.com/JeffreyDing11223/kago
先看一下kago的程式碼結構:
asyncProducer.go
負責初始化非同步producer單個例項或例項group,以及傳送訊息,接受錯誤資訊等。
syncProducer.go
負責初始化同步producer單個例項或例項group,同步傳送訊息等。
consumer.go
負責初始化consumer group單個成員或多個成員,以及初始化partition consumer,還有標記offset,提交offset,獲取所有topics,以及獲取某個topic下所有分割槽等等。
message.go
offsetFile.go
初始化,修改,儲存offset檔案的相關操作。 offsetManager.go
offsetManager的初始化,標記offset等,這個主要結合partition consumer來使用。 config.go
kafka 生產者和消費者以及其他的各項配置。 util.go
各類功能函式。
這裡有小夥伴一定有疑問了,既然已經有標記offset和提交offset了,為什麼還要offsetFile.go
去操作檔案來儲存offset呢,這就是我上面說的儘可能保證消費一次的語義,試想一下,現在我拿到一條訊息,各種加工處理,消費完了,當我要提交offset給kafka的時候,我的客戶端出現網路問題了或者kafka server出了問題,導致offset提交失敗。也就是說,下次繼續消費的時候,就會繼續從這條訊息開始消費,那就相當於是重複消費了這條訊息。於是我在處理訊息和提交offset的中間,加了一步,就是檔案儲存offset,並且供使用者自己選擇,繼續消費是按照kafka server儲存的offset來,還是按照本地檔案來,或者取兩者最大的,這些選項可以在config.go
offsetFile.go
的實現也很簡單,就是封裝一把鎖到os.file
中,並結合sync.map
來支援併發讀寫,檔案內部統一使用json格式,以topic為單位來分類檔案。具體可以看原始碼。
還有具體關於exactly once語義的內容,可以參考我之前發表在部落格中的文章 “kafka訊息交付語義的分析https://blog.csdn.net/jeffrey11223/article/details/80775080“ 。
附上使用例子:
//ayncProducer
import (
...
"github.com/JeffreyDing11223/kago"
...
)
config:=kago.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
produ,_:=kago.InitManualRetryAsyncProducer([]string{"127.0.0.1:9092"}, config)
defer produ.Close()
go func(p *kago.AsyncProducer) {
for{
select {
case suc:=<-p.Successes():
bytes,_:=suc.Value.Encode()
value:=string(bytes)
fmt.Println("offsetCfg:", suc.Offset, " partitions:", suc.Partition," metadata:",suc.Metadata," value:",value)
case fail := <-p.Errors():
fmt.Println("err: ", fail.Err)
}
}
}(produ)
var value string
for i:=0;;i++ {
time11:=time.Now()
value = "this is a message 0805 "+time11.Format("15:04:05")
//傳送的訊息,主題,key
msg := &kago.ProducerMessage{
Topic: "0805_test",
}
//將字串轉化為位元組陣列
msg.Value = sarama.ByteEncoder(value)
//使用通道傳送
produ.Send() <- msg
time.Sleep(500*time.Millisecond)
}
//consumerGroup
config:=kago.NewConfig()
config.Consumer.Return.Errors=true
config.Group.Return.Notifications =true
config.Consumer.Offsets.CommitInterval=1*time.Second
consumer,err:=kago.InitOneConsumerOfGroup([]string{"127.0.0.1:9092"}, "0805_test","cg1",config)
if err!=nil{
log.Println(err)
return
}
defer consumer.Close()
kago.InitOffsetFile() //初始化offset檔案,全域性執行一次即可
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
for{
select {
case msg, ok := <-consumer.Recv():
if ok {
fmt.Fprintf(os.Stdout, ": %s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg.Topic,msg.Partition,msg.Offset,"cg1",true) // 提交offset,最後一個引數為true時,會將offset儲存到檔案中
}
}
}
//partition Consumer
config:=kago.NewConfig()
config.Consumer.Return.Errors=true
config.Group.Return.Notifications =true
config.Consumer.Offsets.CommitInterval=1*time.Second
config.OffsetLocalOrServer=0 //選項配置為優先讀offset檔案
kago.InitOffsetFile() //初始化offset檔案,全域性執行一次即可
pconsumer,err:=kago.InitPartitionConsumer([]string{"127.0.0.1:9092"}, "0805_test",0,"cg1",config) //會根據config.OffsetLocalOrServe來讓pconsumer從指定的offset開始消費
if err!=nil{
log.Println(err)
return
}
defer pconsumer.Close()
pOffsetManager,err2:=kago.InitPartitionOffsetManager([]string{"127.0.0.1:9092"}, "0805_test","cg1",0,config)
if err2 !=nil{
fmt.Println(err2)
return
}
defer pOffsetManager.Close()
go func() {
for err := range pconsumer.Errors() {
fmt.Printf("Error: %s\n", err.Error())
}
}()
for{
msg := <-pconsumer.Recv()
fmt.Printf("Consumed message offsetCfg %d\n message:%s", msg.Offset,string(msg.Value))
pOffsetManager.MarkOffset("0805_test",0,msg.Offset,"cg1",true)
}