1. 程式人生 > >Go Kafka客戶端簡單示例

Go Kafka客戶端簡單示例

一. 準備

  1. 安裝依賴庫sarama
    go get github.com/Shopify/sarama
    該庫要求kafka版本在0.8及以上,支援kafka定義的high-level API和low-level API,但不支援常用的consumer自動rebalance和offset追蹤,所以一般得結合cluster版本使用。
  2. sarama-cluster依賴庫
    go get github.com/bsm/sarama-cluster
    需要kafka 0.9及以上版本
  3. 程式碼示例來自官網(本地已測試),可到官網檢視更多資訊。

二. 生產者

1. 同步訊息模式

import (
    "github.com/Shopify/sarama"
"time" "log" "fmt" "os" "os/signal" "sync" ) var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"} func main() { syncProducer(Address) //asyncProducer1(Address) } //同步訊息模式 func syncProducer(address []string) { config := sarama.NewConfig() config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second p, err := sarama.NewSyncProducer(address, config) if err != nil { log.Printf("sarama.NewSyncProducer err, message=%s \n", err) return } defer p.Close() topic := "test" srcValue := "sync: this is a message. index=%d"
for i:=0; i<10; i++ { value := fmt.Sprintf(srcValue, i) msg := &sarama.ProducerMessage{ Topic:topic, Value:sarama.ByteEncoder(value), } part, offset, err := p.SendMessage(msg) if err != nil { log.Printf("send message(%s) err=%s \n", value, err) }else { fmt.Fprintf(os.Stdout, value + "傳送成功,partition=%d, offset=%d \n", part, offset) } time.Sleep(2*time.Second) } }

2. 非同步訊息之Goroutines

//非同步消費者(Goroutines):用不同的goroutine非同步讀取Successes和Errors channel
func asyncProducer1(address []string)  {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    //config.Producer.Partitioner = 預設為message的hash
    p, err := sarama.NewAsyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }

    //Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var wg sync.WaitGroup
    var enqueued, successes, errors int
    wg.Add(2) //2 goroutine

    // 傳送成功message計數
    go func() {
        defer wg.Done()
        for range p.Successes() {
            successes++
        }
    }()

    // 傳送失敗計數
    go func() {
        defer wg.Done()
        for err := range p.Errors() {
            log.Printf("%s 傳送失敗,err:%s\n", err.Msg, err.Err)
            errors++
        }
    }()

    // 迴圈傳送資訊
    asrcValue := "async-goroutine: this is a message. index=%d"
    var i int
    Loop:
    for {
        i++
        value := fmt.Sprintf(asrcValue, i)
        msg := &sarama.ProducerMessage{
            Topic:"test",
            Value:sarama.ByteEncoder(value),
        }
        select {
        case p.Input() <- msg: // 傳送訊息
            enqueued++
            fmt.Fprintln(os.Stdout, value)
        case <-signals: // 中斷訊號
            p.AsyncClose()
            break Loop
        }
        time.Sleep(2 * time.Second)
    }
    wg.Wait()

    fmt.Fprintf(os.Stdout, "傳送數=%d,傳送成功數=%d,傳送失敗數=%d \n", enqueued, successes, errors)

}

3. 非同步訊息之Select

//非同步消費者(Select):同一執行緒內,通過select同時傳送訊息 和 處理errors計數。
//該方式效率較低,如果有大量訊息傳送, 很容易導致success和errors的case無法執行,從而阻塞一定時間。
//當然可以通過設定config.Producer.Return.Successes=false;config.Producer.Return.Errors=false來解決
func asyncProducer2(address []string)  {
    config := sarama.NewConfig()
    config.Producer.Return.Errors = true
    p, err := sarama.NewAsyncProducer(address, config)
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }

    //Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var enqueued, successes, errors int
    asrcValue := "async-select: this is a message. index=%d"
    var i int
    Loop:
    for {
        i++
        value := fmt.Sprintf(asrcValue, i)
        msg := &sarama.ProducerMessage{
            Topic:"test",
            Value:sarama.ByteEncoder(value),
        }
        select {
        case p.Input() <- msg:
            fmt.Fprintln(os.Stdout, value)
            enqueued++
        case <-p.Successes():
            successes++
        case err := <-p.Errors():
            log.Printf("%s 傳送失敗,err:%s\n", err.Msg, err.Err)
            errors++
        case <-signals:
            p.AsyncClose()
            break Loop
        }
        time.Sleep(2 * time.Second)
    }

    fmt.Fprintf(os.Stdout, "傳送數=%d,傳送失敗數=%d \n", enqueued, errors)
}

三,消費者

    叢集模實現

func main()  {
    topic := []string{"test"}
    var wg = &sync.WaitGroup{}
    wg.Add(2)
    //廣播式消費:消費者1
    go clusterConsumer(wg, Address, topic, "group-1")
    //廣播式消費:消費者2
    go clusterConsumer(wg, Address, topic, "group-2")

    wg.Wait()
}

// 支援brokers cluster的消費者
func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string)  {
    defer wg.Done()
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Offsets.Initial = sarama.OffsetNewest

    // init consumer
    consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
    if err != nil {
        log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
        return
    }
    defer consumer.Close()

    // trap SIGINT to trigger a shutdown
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    // consume errors
    go func() {
        for err := range consumer.Errors() {
            log.Printf("%s:Error: %s\n", groupId, err.Error())
        }
    }()

    // consume notifications
    go func() {
        for ntf := range consumer.Notifications() {
            log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
        }
    }()

    // consume messages, watch signals
    var successes int
    Loop:
    for {
        select {
        case msg, ok := <-consumer.Messages():
            if ok {
                fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "")  // mark message as processed
                successes++
            }
        case <-signals:
            break Loop
        }
    }
    fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
}

四. 參考資料