1. 程式人生 > >Kafka 生產者和消費者的筆記

Kafka 生產者和消費者的筆記

Maven依賴:

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>


一、生產者

首先先看官方API示例:

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

設定生產者需要連線的kafka地址,回令型別,重試次數,批量提交大小,提交延遲等待時間(等待時間內可以追加提交),快取大小,序列化方法。

生產者目前沒有碰到太多問題,這也跟kafka一個生產,多個消費的機制有關。需要注意的有兩點:

1、acks回令。如果必須等待回令,那麼設定acks為all;否則,設定為-1;等待回令會有效能損耗。

2、生產者在傳送訊息的過程中,會自己預設批量提交。所以,如果單條指令的傳送請求,記得傳送完後flush才能生效。

二、消費者

依舊先看官方示例:

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

類似生產者的配置,依次設定kafka連線地址,消費者標識(區分消費者offset),是否自動提交offset,提交offset週期,心跳離線超時時間,序列化方法。

消費者問題說明:

1、配置bootstarp.servers後,如非特殊需求,不需要配置zookeeper.server。否則,容易造成zookeeper的資料不一致問題。

2、group.id是標識消費者的ID。每一個group.id消費後,kafka會記錄該id消費的offset到zookeeper。所以,此處需要注意,(1)如果多個地方都使用相同的groupid,可能造成個別消費者消費不到的情況(2)如果單個消費者消費能力不足的話,可以啟動多個相同groupid的consumer消費,處理相同的邏輯。但是,多執行緒的時候,需要增加每個groupid下的partition分割槽數量,便於每個執行緒穩定讀取固定的partition,提高消費能力。

3、如非特殊需求,自動offset提交便可以滿足;如果有訊息的重複消費需求,可以手動儲存每次的offset,自動提交。

4、自動提交週期。消費者每次從佇列里拉出來的數量是多條,如果消費能力差,處理速度慢,在提交週期內沒有完成處理,那麼會導致提交失敗,影響消費和系統穩定。所以可以考慮增加週期或者使用記憶體佇列二次儲存訊息,提升處理速度。另外,可以檢視IO是否使用批量提交的方式。如hbase,redis,mysql等。

5、消費者心跳超時時間。如果處理速度超出改時間,kafka認為消費者離線,提交也會失敗,也會導致消費問題。所以,需要儘量讓心跳超時大於offset自動提交週期。

6、consumer.subcribe(Arrays.asList("topic1","topics"))。消費者可以同時監聽多個topic(通道),監聽過程屬於執行緒切換。如topic1中拉取10個,再去topic2中拉取10個,一起返回。具體監聽topic的數量需要根據訊息佇列的設定和訊息量確定。如果訊息很少,一個執行緒完全可以勝任多個。接受完後增加處理邏輯就可以。如果訊息量大,就一個執行緒一個topic,或者多個執行緒一個topic。

7、因為kafka會記住消費者的訊息處理指標offset,所以,如果一旦消費者離線,那麼會造成訊息積壓,無法保持最新的資料消費。除非每次更換groupid獲取最新資料。但是這種更換的方式並不推薦,因為總會產生新的消費者,增加zookeeper的資料。所以,消費者儘量保持線上。如果不使用訊息,可以儲存、更新或丟棄。