Apache Kafka 0.9 KafkaConsumer
阿新 • • 發佈:2019-01-10
Kafka開始被設計時,帶有一個Scala的Producer和Consumer。而這些API有很多的缺陷,例如,支援Consumer Group且容錯的“high-level” Consumer API,不能支援太多更復雜的情況。另一種“simple” Consumer Client提供了所有的操作,但是它要求使用者自己管理故障和錯誤。所以,重新設計了可以應對大量舊Clients很難甚至不可能處理的使用者場景。
重寫Producer API的第一階段是在0.8.1。最新的0.9完成了第二階段,並且有新的Consumer API。基於一組全新的Kafka自己的協調協議,新Consumer有以下優點:
- 統一的API:新Consumer結合了舊的“simple”和“high-level” Consumer Clients的功能,提供了用組織協調和低水平訪問建立你自己的消費策略。
- 減少了依賴:新Consumer用純Java重寫。它不在依賴與Scala執行期和ZooKeeper,這使得你的工程包含更輕量級的庫。
- 更安全:在Kafka 0.9實現了安全擴充套件(security extensions),且僅被新Consumer支援。
- 這種新Consumer還新增了一套管理容錯的Consumer處理組。以前這個功能是用一個笨重的Java Client實現(與ZooKeeper互動嚴重)。這種邏輯的複雜性使得難以用其他語言構建一個具有全部特性的Consumer。隨著這種新協議的引入,其變得很容易實現。
介紹
在開始編碼前,我們先熟悉下基本概念。在Kafka中,每個topic被劃分到一個叫做Partition的日誌集合中。Producers以自己的速度寫這些日誌,Consumers以自己的速度讀這些日誌。Kafka通過在一個共享公共的group identifier的Consumer Group中分配Partitions來拓展topic的消費。下圖是一個有三個Partition的topic和一個有兩個成員的Consumer Group。該topic中的每個Partition只被精確地分配給一個Consumer。雖然舊的Consumer依賴於ZooKeeper管理Group,但是新的Consumer使用一組內建在Kafka的協調協議。對於每個Group,Brokers中的某一個被選為Group Coordinator。該Coordinator負責管理對應Group的狀態。它的主要任務是當新的Consumer成員加入或老的Consumer成員消失,和更改topic的metadata時,協調Partition的分配。重新分配Partition的過程稱作對該Group的負載均衡。 當一個Group第一次被初始化時,在每個Partition中,Consumers要麼從最早的offset讀資料,要麼從最晚的offset讀資料。每個Partition中的訊息被順序的讀取。當Consumer開始處理,它就會提交已經成功處理過的訊息的offset。例如,在下圖中,Consumer的位置在offset 6,它上次提交的offset是offset 1。
當一個Partition重新分配到Group中的另一個Consumer時,初始Partition被設定成最近一次提交的offset。如果上例中的Consumer突然宕了,接管該Partition的Group中Consumer將會從offset 1開始消費。那樣一來,它不得不重新處理已宕的Consumer的offset 6。 該圖也給出了log中另外兩個重要的Partitions。Log End Offset是最後一條message寫入log的offset。High Watermark是最後一條message被成功複製到所有的log副本中的offset。從Consumer的角度看,最主要的是知道你只能讀到High Watermark。這將防止Consumer讀沒有副本可能稍後會丟失的資料。
配置和初始化
為了開始學習Consumer,你需要在你的project中新增kafka-clients依賴。Maven片段如下:<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
Consumer用Properties檔案構造,就像其他的Kafka Clients一樣。在下面的例子中,我提供了一個使用Consumer Groups最簡單的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
就像在舊Consumer和Producer中一樣,我們需要為Consumers配置一個Brokers的初始化列表,以便能訪問叢集中的其他node。但是,不需要提供叢集中所有的server——Client從列表中這些Brokers中確定全部存活的Broker。這裡我們已經假定該Broker是在localhost上執行。同時,Consumer也需要知道怎麼反序列化message的Key和Value。最後,為了新增到一個Consumer Group中,我們需要配置一個Group Id。我們繼續閱讀本文,將會介紹更多的配置。
Topic訂閱
為了可以開始消費,你必須先訂閱(subscribe)一個你的程式需要讀取的Topic。在下例中,我們subscribe了foo和bar兩個Topic:consumer.subscribe(Arrays.asList("foo", "bar"));
當你subscribe之後,該Consumer將會和這個Group中的其他Consumer協調配合來分配Partition。當你開始消費資料時,這個過程自動完成。後面我們也會介紹怎麼使用assign API去人工手動分配Partition,但是請切記不能同時混合使用自動和手動兩種分配方式。 subscribe方法是不能增加的,你必須在列表裡包含所有你想要消費的Topic。你可以在任何時候改變你已經subscribe的Topic集合——當你呼叫subscribe方法時,任何之前訂閱的Topic都將被新的Topic列表所替代。
基本的Poll迴圈
Consumer需要並行地從許多Partition獲取資料,因為許多Topic可能分佈在許多不同的Broker中。為了實現這一點,它使用的API方式和Unix中的poll或者select:一旦Topic被註冊,未來所有的協調、負載均衡、和資料獲取都通過一個專門在事件迴圈中被呼叫的單獨的poll方法來驅動。這就需要一個簡單的、高效的可以從單個執行緒中操作所有IO的實現。 在subscribe Topic之後,你需要開啟事件迴圈來得到Partition的分配,並且開始獲取資料。這聽起來很複雜,但是你需要做的所有事情只是在迴圈中呼叫poll方法,剩下的其他事情就由Consumer來處理。每次呼叫poll方法,將返回一個來自已分配的Partition的message集合(可能是空的)。下面的例子展示了一個基本的poll迴圈,它將列印獲取到的資料的offset和記錄的value:try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
consumer.close();
}
poll API返回從當前Position獲取的記錄。當這個Group是第一次建立,Position將根據重置策略(reset policy,要麼為每個Partition設定為最早的offset,要麼為每個Partition設定為最晚的offset)設定。一旦Consumer開始提交offsets,隨後每次負載均衡將重置Position為最後一次提交的offset。poll()方法的引數控制當Consumer在當前Position等待記錄時,它將阻塞的最大時長。當有記錄到來時,Consumer將會立即返回。但是,在返回前如果沒有任何記錄到來,Consumer將等待直到超出指定的等待時長。 Consumer被設計成執行在自己所屬的執行緒中。在沒有外部同步的情況下,使用多執行緒是不安全的。同時,它很可能不是一個好的主意去嘗試。在這個例子中,我們使用一個flag,用於當程式shutdown時,跳出poll迴圈。當其他執行緒設定flag為false時(如,shutdown該程序),poll()方法一返回就跳出迴圈。同時,程式完成處理任何已被返回的記錄。 當你使用完Consumer時,你應該總是關閉該Consumer。這樣做不僅為了清理使用的Sockets,也確保Consumer告訴Coordinator要退出該Group。 這個例子使用了一個相對短的超時時長,以確保當關閉Consumer時,不會有太大的延遲。另外,你可以設定一個長的超時,同時用wakeup API跳出迴圈。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + “: ” + record.value());
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
我們改變超時為Long.MAX_VALUE,這基本上意味著Consumer將無期限地阻塞,直到下一條記錄達到。替代上一個例子中的flag,執行緒呼叫consumer.wakeup()中斷活躍的poll()來觸發shutdown,造成它的原因是丟擲WakeupException。這個API是可以在另一個執行緒安全的使用。注意如果在處理過程中沒有活躍的poll(),這個異常將被在下一次呼叫時丟擲。在這個例子中,我們捕獲這個異常,阻止它傳播。
整合
在接下來的例子中,我們將這些都放在一起去構建一個簡單的Runnable任務,它包含初始化Consumer,subscribe一個Topic列表,無限迴圈執行poll()直到外力地關閉。public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id,
String groupId,
List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put(“group.id”, groupId);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
為了測試這個例子,你需要一個執行的0.9.0.0的Kafka Broker,和一個有string型別資料可以消費的Topic。寫大量string型別資料到Topic的最簡單的方法是使用kafka-verifiable-producer.sh指令碼。為了更有意思,我們需要使這個Topic有多個Partition,這樣就不會有一個Consumer去做所有事情。例如,一個Kafka Broker和ZooKeeper都執行在localhost,你可以在Kafka的根目錄像下面一樣操作:
# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181
# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092
然後,我們建立一個簡單的驅動程式來啟動有3個Consumer的Consumer Group,所有的Consumer都subscribe我們已經建立的相同的Topic。
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "consumer-tutorial-group"
List<String> topics = Arrays.asList("consumer-tutorial");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
這個例子向一個Executor提交了3個Runnable Consumer。每一個Thread都給一個不同的Id,以便你可以發現是哪個Thread正在接收資料。當你停止處理時,shutdown hook就會被呼叫,這樣就可以使用wakeup()停止3個Thread,並且等待它們shutdown。如果你執行它,你將會看來來自這些Thread的很多資料。
2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}
以上輸出表明消費發生在三個Partition。每個Partition被分配給3個Thread中的一個。在每個Partition中,你可以看到offset如你所預料的那樣增加。你可以用Ctrl+C或者IDE關閉程式。
Consumer Liveness
作為Consumer Group的一部分,每個Consumer被分配它已subscribe的Topic的Partitions的一個Partition子集。在這些Partition上有一個Group鎖。如果這個鎖被獲得,該Group中的其他Consumer將不能再從鎖住的Partition讀資料。當你的Consumer是健康的時,這也正是你想要的。它是唯一的方法讓你避免重複消費。但是,如果Consumer由於機器或者程式的失敗而宕掉,你需要把鎖釋放掉,以便Partition可以分配給其他健康的Consumer。 Kafka的Group Coordination Protocol使用心跳機制(HeartBeat Mechanism)定位這個問題。在每次Rebalance之後,所有當前的Consumer都開始傳送週期性心態給Group Coordinator。如果這個Coordinator一直接收心跳,它就假定這些Consumer是健康的。在每一個接收到的心跳中,該Coordinator都會開始(或重置)一個計時器。如果這個計時器過期,沒有心跳接收,那麼該Coordinator就會標記這個Consumer,給Group中的其他Consumer發訊號,它們應該重新加入這個Group以便Partition重新分配。計時器的持續時間被稱為Session Timeout,它在Client用session.timeout.ms配置。props.put("session.timeout.ms", "60000");
如果機器或者程式宕掉或者網路分割槽隔離了來自Coordinator的Consumer,Session Timeout確保鎖將會被釋放。但是,一般程式失敗是一個小問題。因為Consumer一直向Coordinator傳送心跳,這不代表程式是健康的。 Consumer的poll迴圈被設計成可以處理這個問題。當你呼叫poll()或者其他的阻塞API,所有的網路IO將在前臺完成。Consumer不使用任何後臺執行緒。這意味著當你呼叫poll()時,心跳只被傳送給Coordinator。如果你的程式停止poll()(如,因為處理程式丟擲異常,或者下游系統癱瘓),接著就沒有心跳傳送,Session Timeout過期,並且Group會被Rebalance。 唯一的問題是偽Rebalance可能因為Consumer處理message的時長大於Session Timeout被觸發。因為,你需要設定Session Timeout足夠長來處理message。它的預設值是30秒,但是設定幾分鐘也不是合理的。較大的Session Timeout的缺點是Coordinator檢測真正的Consumer宕掉需要花費更長的時間。
Delivery Semantics
當Consumer第一次被建立時,根據通過配置auto.offset.reset的策略來初始化offset。一旦Consumer開始消費資料,它根據程式的需要週期性的提交offset。在以後每次Rebalance之後,對於Group中的Partition,其Position被設為最後一次提交的offset。如果在為已成功處理的message提交offset之前,Consumer宕掉,其他的Consumer將停止重複消費。你提交offset越頻繁,在宕機後你看到的副本越少。 到目前為止,在例子中,我們都使用了自動提交的策略。當我們看到enable.auto.commit被設定成true(預設值是true),Consumer根據配置的auto.commit.interval.ms的Interval自動觸發offset的提交。通過降低提交的Interval,你可以限制在宕機情況下必須做的重新處理的Consumer的數量。 為了使用自定義的提交API,你應該首先在Consumer的配置中通過設定enable.auto.commit為false來禁用自動提交。props.put("enable.auto.commit", "false");
提交的API是很容易被使用,但是最重要的一點是怎麼整合在poll迴圈中。因此,下面的例子中包含擁有提交的完整的poll迴圈。操作手動提交的最簡單的方法是使用同步方式提交APi。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
例子使用沒有引數的commitSync API提交最後一次呼叫poll()返回的offset。這個無限地阻塞呼叫直到要麼提價成功,要麼有一個不可恢復的失敗提交。你需要關心的最主要的錯誤是當message的處理超過的Session Timeout的時長。當這種情況發生時,Coordinator將會把這個Consumer提出該Group,同時導致CommitFailException異常丟擲。你的程式需要處理這個錯誤,通過回滾從最後一次成功提交的offset開始被消費過的message造成的所有改變。 你需要確定offset被提交只會發生在message被成功處理之後。如果在提交發送之前,Consumer就當掉了,這個message將不得不重新處理。如果提交策略保證了最後一次提交的offset不會在Current Position之前,那麼你就是一個“at least once”的傳送方式。
通過改變提交策略保證Current Position不會超過最後一次提交的offset(Last Committed Offset)。如上圖所示,你就是“at most once”的傳送方式。如果Consumer在它的Position趕上最後一次提交的offset之前就宕掉,那麼所有message將會在這一階段丟失,但是你可以確保沒有message會被再次處理。為了實現這一策略,我們只需要改變提交和message處理的順序。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
consumer.commitSync();
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
注意使用自動提交,系統給你的是“at least once”策略,因為Consumer保證只為已經返回到你的應用程式的message提交offset。在最糟糕的情況下,你可能重新處理的message的數量上限,是你的程式在Commit Interval(通過auto.commit.interval.ms配置)能夠處理的message的數量。 然而,通過使用Commit API,你會有很多靈活的操作可以控制你將接受多少重複處理。最極端的情況是,你在每個message被處理之後提交offset,如下:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
consumer.commitSync(Collections.singletonMap(record.partition(), new OffsetAndMetadata(record.offset() + 1)));
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
在這個例子中,我們在呼叫commitSync()方法中傳入我們明確指定的想要提交的offset。這個提交的offset總是你的程式讀到的下一條message的offset。當commitSync()方法沒有引數時,Consumer提交被返回到你的程式的最後一次的offset(加上1)。但是,我們不能在這裡使用,因為它需要Commit Position在我們實際處理的offset之前。 很明顯,在每個message之後提交很可能對於很多場景不是一個好的想法,因為處理執行緒不得不等待每個提交請求從伺服器返回而阻塞。這將降低消費能力。更合理的方法是在N個message之後提交,N個message可以協調很好效能。 這個例子中commitSync()的引數是一個出自Topic Partition的OffsetAndMetadata的例項的Map。Commit APi允許你在每次提交中新增一些額外的metadata。這樣可以記錄提交的時間,哪臺主機發送的,或一切你的程式需要的資訊。在這個例子中,我們沒新增什麼。 代替當message接收時提交,這裡有個有一個更合理的策略:當你處理完來自每個Partition的message時,提交offset。ConsumerRecord集合提供了訪問它包含的Partition集合,和每個Partition的message。下面的例子展示這種策略:
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(record.offset() + ": " + record.value());
long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
}
}
} finally {
consumer.close();
}
到目前為止,這些例子都聚焦於同步方式提交的API,但是Consumer也可以利用非同步方式的API——commitAsync。一般使用非同步方式提交可以提高消費能力,因為你的程式可以在提交返回之前,就開始處理下一批message。需要權衡的是,你可能只能更晚地發現該提交失敗。下面的例子給出基本的使用。
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
// application specific failure handling
}
}
});
}
} finally {
consumer.close();
}
注意我們為commitAsync提供了回撥,它當提交完成(無論成功或者失敗)時被Consumer呼叫。如果你不需要這些,你也可以呼叫無參的commitAsync。
Consumer Group檢查
當一個Consumer Group是活躍的,你可以在命令列使用 consumer-groups.sh 指令碼檢查Partition的分配和消費的進度。它在Kafka根目錄的bin資料夾中。# bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial-group --bootstrap-server localhost:9092
其輸出結果:
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.1
它顯示了該Consumer Group中的所有Partition分配,哪個Consumer擁有它,和最後一次提交的offset(即這裡的“CURRENT OFFSET”)。一個Partition的滯後是The Log End Offset和The Last Committed Offset之差。管理員可以監視這個,以確保Consumer Group的消費跟得上Producer。
使用手動分配
正如本文開始提到的那樣,新的Consumer為不需要Consumer Group的使用者場景實現了“lower level”訪問。這樣的方便性是採用這種API最重要的原因之一。舊的“simple”Consumer也提供了這個,但是它要求你自己做很多錯誤處理。在新的Consumer中,你僅僅需要分配你想要讀資料的Partition,並且開始poll。 下面的例子展示了使用partitionFor API分配一個Topic中的所有Partition。List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);
類似於subscribe,呼叫assign()方法時必須傳入你想要讀資料的所有Partition的一個列表。一旦Partition被分配,poll迴圈就會向之前一樣工作。 切記所有的offset提交都通過Group Coordinator,無論它是Simple Consumer還是Consumer Group。因此,如果你需要提交offset,你必須為group.id設定一個合理的值,以免它和其他Consumer衝突。如果Simple Consumer用匹配活躍Consumer Group的一個Group Id提交offset,這個Coordinator將會拒絕提交(它將會導致CommitFailException)。如果另一個Simple Consumer例項共享同一個Group Id,將不會有任何錯誤發生。