Go Kafka客戶端簡單示例
阿新 • • 發佈:2018-12-31
一. 準備
- 安裝依賴庫sarama
go get github.com/Shopify/sarama
該庫要求kafka版本在0.8及以上,支援kafka定義的high-level API和low-level API,但不支援常用的consumer自動rebalance和offset追蹤,所以一般得結合cluster版本使用。 - sarama-cluster依賴庫
go get github.com/bsm/sarama-cluster
需要kafka 0.9及以上版本 - 程式碼示例來自官網(本地已測試),可到官網檢視更多資訊。
二. 生產者
1. 同步訊息模式
import (
"github.com/Shopify/sarama"
"time"
"log"
"fmt"
"os"
"os/signal"
"sync"
)
var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"}
func main() {
syncProducer(Address)
//asyncProducer1(Address)
}
//同步訊息模式
func syncProducer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer p.Close()
topic := "test"
srcValue := "sync: this is a message. index=%d"
for i:=0; i<10; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Topic:topic,
Value:sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Printf("send message(%s) err=%s \n", value, err)
}else {
fmt.Fprintf(os.Stdout, value + "傳送成功,partition=%d, offset=%d \n", part, offset)
}
time.Sleep(2*time.Second)
}
}
2. 非同步訊息之Goroutines
//非同步消費者(Goroutines):用不同的goroutine非同步讀取Successes和Errors channel
func asyncProducer1(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
//config.Producer.Partitioner = 預設為message的hash
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
//Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var wg sync.WaitGroup
var enqueued, successes, errors int
wg.Add(2) //2 goroutine
// 傳送成功message計數
go func() {
defer wg.Done()
for range p.Successes() {
successes++
}
}()
// 傳送失敗計數
go func() {
defer wg.Done()
for err := range p.Errors() {
log.Printf("%s 傳送失敗,err:%s\n", err.Msg, err.Err)
errors++
}
}()
// 迴圈傳送資訊
asrcValue := "async-goroutine: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg: // 傳送訊息
enqueued++
fmt.Fprintln(os.Stdout, value)
case <-signals: // 中斷訊號
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
wg.Wait()
fmt.Fprintf(os.Stdout, "傳送數=%d,傳送成功數=%d,傳送失敗數=%d \n", enqueued, successes, errors)
}
3. 非同步訊息之Select
//非同步消費者(Select):同一執行緒內,通過select同時傳送訊息 和 處理errors計數。
//該方式效率較低,如果有大量訊息傳送, 很容易導致success和errors的case無法執行,從而阻塞一定時間。
//當然可以通過設定config.Producer.Return.Successes=false;config.Producer.Return.Errors=false來解決
func asyncProducer2(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Errors = true
p, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
//Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, successes, errors int
asrcValue := "async-select: this is a message. index=%d"
var i int
Loop:
for {
i++
value := fmt.Sprintf(asrcValue, i)
msg := &sarama.ProducerMessage{
Topic:"test",
Value:sarama.ByteEncoder(value),
}
select {
case p.Input() <- msg:
fmt.Fprintln(os.Stdout, value)
enqueued++
case <-p.Successes():
successes++
case err := <-p.Errors():
log.Printf("%s 傳送失敗,err:%s\n", err.Msg, err.Err)
errors++
case <-signals:
p.AsyncClose()
break Loop
}
time.Sleep(2 * time.Second)
}
fmt.Fprintf(os.Stdout, "傳送數=%d,傳送失敗數=%d \n", enqueued, errors)
}
三,消費者
叢集模實現
func main() {
topic := []string{"test"}
var wg = &sync.WaitGroup{}
wg.Add(2)
//廣播式消費:消費者1
go clusterConsumer(wg, Address, topic, "group-1")
//廣播式消費:消費者2
go clusterConsumer(wg, Address, topic, "group-2")
wg.Wait()
}
// 支援brokers cluster的消費者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// init consumer
consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
if err != nil {
log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("%s:Error: %s\n", groupId, err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
}
}()
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
successes++
}
case <-signals:
break Loop
}
}
fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}