Kafka消費者生產者實例
阿新 • • 發佈:2018-03-24
rap subscribe 只有一個 HA .sh 生產者 安裝 group blog
版權聲明:本文為博主原創文章,轉載請註明出處。 https://blog.csdn.net/u011116672/article/details/76400861
目錄(?)[-]
- 安裝Kafka
- 基於Console
- 基於Application
- 單個consumer
- 集群消費
為了更為直觀展示Kafka的消息生產消費的過程,我會從基於Console和基於Application兩個方面介紹使用實例。Kafka是一個分布式流處理平臺,具體來說有三層含義:
- 它允許發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
- 它可以容錯的方式存儲記錄流。
- 它可以處理記錄發生時的流。
由於主要介紹如何使用Kafka快速構建生產者消費者實例,所以不會涉及Kafka內部的原理。一個基於Kafka的生產者消費者過程通常是這樣的(來自官網):
安裝Kafka
從官網下載kafka_2.11-0.11.0.0.tgz
,解壓後安裝到指定目錄:
cd kafka_2.11-0.11.0.0
tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall
- 1
- 2
啟動Kafka:
bin/kafka-server-start.sh config/server.properties
- 1
基於Console
創建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- 1
Producer發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 1
在控制臺輸入要發送的消息:
This is a message
This is another message
- 1
- 2
Consumer接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- 1
輸入命令後可以看到控制臺輸出了剛才的消息:
This is a message
This is another message
- 1
- 2
基於Application
單個consumer
生產者:
public class SimpleKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
//broker地址
props.put("bootstrap.servers", "localhost:9092");
//請求時候需要驗證
props.put("acks", "all");
//請求失敗時候需要重試
props.put("retries", 0);
//內存緩存區大小
props.put("buffer.memory", 33554432);
//指定消息key序列化方式
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//指定消息本身的序列化方式
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
消費者:
public class SimpleKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//每個消費者分配獨立的組號
props.put("group.id", "test");
//如果value合法,則自動提交偏移量
props.put("enable.auto.commit", "true");
//設置多久一次更新被消費消息的偏移量
props.put("auto.commit.interval.ms", "1000");
//設置會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條消息
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
System.out.println("Subscribed to topic " + "test");
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
先啟動生產者,發送消息到broker,這裏簡單發送了10條從0-9的消息,再啟動消費者,控制臺輸出如下:
集群消費
以上的程序只是單生產者單消費者的場景,所謂集群消費就是同一個topic的消費可能有多個消費者消費,也稱廣播消費。集群消費只一種多線程或者多機器的消費方式。
要實現集群消費只需要為每個消費者指定不同的group.id
就可以。由於代碼比較簡單就不貼了。
測試發現,當為了兩個consumer(這裏是兩個進程)指定不同的group.id
後,producer發送的消息兩個consumer都能接受到,這很顯然,集群消費嘛。為設置兩個consumer的group.id
為同一個的時候,只有一個消費者能消費者到。也就是說,kafka的消息只能由組中的單個用戶讀取。
Kafka消費者生產者實例