1. 程式人生 > >Kafka消費者生產者例項

Kafka消費者生產者例項

卡夫卡消費者生產者例項

2017年07月30日18:22:56  rhwayfunn  閱讀數:13818標籤:  kafka  更多

個人分類:  分散式系統

版權宣告:本文為博主原創文章,轉載請註明出處.https://blog.csdn.net/u011116672/article/details/76400861

為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制檯和基於應用兩個方面介紹使用例項.Kafka是一個分散式流處理平臺,具體來說有三層含義:

  1. 它允許釋出和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。
  2. 它可以容錯的方式儲存記錄流。
  3. 它可以處理記錄發生時的流。

由於主要介紹如何使用卡夫卡快速構建生產者消費者例項,所以不會涉及卡夫卡內部的原理一個基於卡夫卡的生產者消費者過程通常是這樣的(來自官網):

卡夫卡生產者消費者

安裝卡夫卡

官網下載kafka_2.11-0.11.0.0.tgz,解壓後安裝到指定目錄:

cd kafka_2.11-0.11.0.0
tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall

啟動卡夫卡:

bin/kafka-server-start.sh config/server.properties
  • 基於控制檯

建立主題

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  •  

生產者傳送訊息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  •  

在控制檯輸入要傳送的訊息:

This is a message
This is another message

消費者接收訊息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

輸入命令後可以看到控制檯輸出了剛才的訊息:

This is a message
This is another message

基於應用

單個消費者

生產者:

public class SimpleKafkaProducer {

    public static void main(String[] args) {

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "localhost:9092");

        //請求時候需要驗證
        props.put("acks", "all");

        //請求失敗時候需要重試
        props.put("retries", 0);

        //記憶體快取區大小
        props.put("buffer.memory", 33554432);

        //指定訊息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //指定訊息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        System.out.println("Message sent successfully");
        producer.close();
    }
}

消費者:


public class SimpleKafkaConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        //每個消費者分配獨立的組號
        props.put("group.id", "test");

        //如果value合法,則自動提交偏移量
        props.put("enable.auto.commit", "true");

        //設定多久一次更新被消費訊息的偏移量
        props.put("auto.commit.interval.ms", "1000");

        //設定會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條訊息
        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("test"));

        System.out.println("Subscribed to topic " + "test");
        int i = 0;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)

                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                        record.offset(), record.key(), record.value());
        }
    }
}
  • 先啟動生產者,傳送訊息到經紀人,這裡簡單傳送了10條從0-9的訊息,再啟動消費者,控制檯輸出如下:

消費結果

叢集消費

以上的程式只是單生產者單消費者的場景,所謂叢集消費就是同一個主題的消費可能有多個消費者消費,也稱廣播消費。叢集消費只一種多執行緒或者多機器的消費方式。

要實現叢集消費只需要為每個消費者指定不同的group.id就可以。由於程式碼比較簡單就不貼了。

測試發現,當為了兩個消費者(這裡是兩個程序)不同指定的group.id後,生產者傳送的訊息兩個消費者都能接受到,這很顯然,叢集消費嘛。為設定兩個消費者的group.id為同一個的時候,只有一個消費者能消費者到。也就是說,卡夫卡的訊息只能由組中的單個使用者讀取