1. 程式人生 > >Kafka 0.9 新消費者API

Kafka 0.9 新消費者API

kafka誕生之初,它自帶一個基於scala的生產者和消費者客戶端。但是慢慢的我們認識到這些API有很多限制。比如,消費者有一個“高階”API支援分組和異常控制,但是不支援很多更復雜的應用場景;它也有一個“低階”API,支援對細節的完全控制,但是要求碼農自己控制失敗和異常。所以重新設計了它們。

這個過程的第一階段就是在0.8.1版本的時候重寫了生產者API。在最近的0.9版本中完成了第二階段,提供了消費者的新API。建立在新的分組協議只是,新的消費者帶來以下好處:

  • API更加簡潔:新的消費者API綜合了老版本的“高階”和“低階”API的功能,同時提供了分組機制和lower level access來實現自己的消費策略;
  • 減少了依賴:新的消費者API是用純java寫的。沒有了scala和zk的依賴,讓程式碼工程更輕量級;
  • 更安全:新的消費者API支援kafka0.9版本的安全機制;
  • 新的消費者也增加了一系列的機制來控制組消費時的容錯。老的API使用大量的java程式碼實現的(與ZK互動過多),複雜的邏輯很難讓其他語言的消費者實現。新的API使這變得更簡單。現在已經有C版本的客戶端了。

雖然新的消費者是被重新設計過的和新的互動機制,但很多感念沒有本質區別,所以熟悉老API的碼農也不會覺得新API生硬。但是,也有一些特別細微的細節相對於組管理和執行緒模型需要在碼程式碼的時候注意。

還有一個注意點:新的消費者API還是測試版本。(不穩定哦,隨時會有BUG冒出來,偉大的踩坑者)

Getting Started

略過舊API中的分組消費介紹。。。

New_Consumer_figure_1

舊的API強依賴ZK做分組管理,新的API使用kafka自己的分組協調機制。針對每個消費組,會從所有的broker中挑選出一個出來充當這個組的“協調員”。協調員負責管理該組的狀態。它的主要任務是,當新的組成員進入、老的組成員離開和元資料改變時進行分割槽的協調分配。這種重新分配分割槽的行為稱之為“重新平衡組”。

當一個組首次被初始化,每個分割槽的消費者一般會從最早或最近的資料開始讀。然後在每個分割槽的訊息被依次讀出。在消費過程中,消費者會提交已經成功處理了的訊息的偏移量。例如,在下圖中,消費者正在讀的訊息的偏移量是6,而它最近一次提交的偏移量是1:

New_Consumer_Figure_2

當一個分割槽被重新分配給組中的另一個消費者時,這個消費者會從上一個消費者最後一次提交的偏移量處開始讀。如果上面例子中的消費者突然崩潰了,其他組成員讀的時候會從1開始讀。這種情況下,它會從1到6重新消費一遍。

上圖中還標註了其他兩個位置。Log End Offset標記了最後一條訊息寫入後的偏移量。High Watermark標記了最後被其他replicas同步成功了的偏移量。對於消費者來說,只能讀到High Watermark處,這樣為了防止未同步的訊息被讀了以後丟失掉。

配置和初始化

在開始使用新的消費者API之前,先把 kafka-clients 這個依賴加到工程中。

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.9.0.1</version>
</dependency>

消費者通過Properties檔案來配置消費屬性,下面是一個最小配置:

複製程式碼

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

複製程式碼

與舊的消費者和生產者一樣,我們需要配置broker連線引數。我們不需要提供叢集中所有伺服器的連線引數,客戶端會根據給定的連線引數集合得到所有的存活broker。客戶端還需要配置key和value的初始化類。最後配置group.id。

訂閱TOPIC

在開始消費之前,必須先訂閱一些需要讀取訊息的topic。下面的例子中,同時訂閱了foo和bar兩個topic:

consumer.subscribe(Arrays.asList("foo", "bar"));

訂閱後,消費者會與組內其他消費者協調分割槽的分配。在開始消費訊息的時候這些事自動完成的。稍後會展示如何使用分配API手動指定分割槽 。但是不能手動和自動一起用。

訂閱topic的方法不能增量訂閱:每次訂閱必須包含要訂閱的所有topic。可以隨時改變訂閱,新的訂閱會替換舊的訂閱。

基本的POLL迴圈

消費者需要並行化地讀取資料,可能從分佈在不同broker的不同topic的不同分割槽。為了做到這一點,新的API用了近似unix得pool或者select呼叫:一旦訂閱了一些topic,所有未來的協調、重新平衡和資料獲取都被一個呼叫事件所驅動。這需要單個執行緒掌控所有IO的一個簡單而有效的實現。

訂閱一個主題後,需要一個事件迴圈來接受分割槽的分配和資料的獲取。聽起來複雜,其實只需要在迴圈呼叫poll方法,然後消費者客戶端就會處理剩下的事情。每次呼叫poll方法,都會收到(可能為空)被分配的分割槽裡面的一系列資料。下面是基本例子:

複製程式碼

try {
  while (running) {
   ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());
  }
} finally {
  consumer.close();
}

複製程式碼

傳進poll方法裡面的引數是一個Long型別的,表示等待訊息的時間:如果佇列裡面有訊息,會立馬返回,如果沒有,會等待指定的時間然後返回。

消費者被設計成在自己的執行緒裡面執行。沒有外部同步的多執行緒是不安全的,也是不建議這樣做的。

當消費完成後一定記得關閉它,這樣會保證組內協調分配分割槽不會混亂(因為一個分割槽只能被組內的一個消費者消費)。

上例中使用了一個較小的超時時間為了保證不會有太多延時去關閉消費者。下面這個例子中使用了很長的超時時間和用wakeup API來跳出迴圈:

複製程式碼

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + “: ” + record.value());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}

複製程式碼

wakeup操作是執行緒安全的:

複製程式碼

/**
 * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
 * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
 */
@Override
public void wakeup() {
    this.client.wakeup();
}

複製程式碼

整合到一起:

複製程式碼

public class ConsumerLoop implements Runnable {
  private final KafkaConsumer<String, String> consumer;
  private final List<String> topics;
  private final int id;

  public ConsumerLoop(int id,
                      String groupId, 
                      List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put(“group.id”, groupId);
    props.put(“key.deserializer”, StringDeserializer.class.getName());
    props.put(“value.deserializer”, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
  }
 
  @Override
  public void run() {
    try {
      consumer.subscribe(topics);

      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, String> record : records) {
          Map<String, Object> data = new HashMap<>();
          data.put("partition", record.partition());
          data.put("offset", record.offset());
          data.put("value", record.value());
          System.out.println(this.id + ": " + data);
        }
      }
    } catch (WakeupException e) {
      // ignore for shutdown 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}

複製程式碼

測試這裡例子的話需要造一些資料。最簡單的方式是使用kafka-verifiable-producer.sh這個指令碼。

# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181

# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092

然後是驅動類:

複製程式碼

public static void main(String[] args) { 
  int numConsumers = 3;
  String groupId = "consumer-tutorial-group"
  List<String> topics = Arrays.asList("consumer-tutorial");
  ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

  final List<ConsumerLoop> consumers = new ArrayList<>();
  for (int i = 0; i < numConsumers; i++) {
    ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
    consumers.add(consumer);
    executor.submit(consumer);
  }

  Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
      for (ConsumerLoop consumer : consumers) {
        consumer.shutdown();
      } 
      executor.shutdown();
      try {
        executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        e.printStackTrace;
      }
    }
  });
}

複製程式碼

例子中啟動了三個執行緒來消費訊息,每個執行緒給一個單獨的ID,這樣就能清楚的看到哪個執行緒消費到了哪些資訊。shutdown hook會呼叫執行緒的wakeup方法來結束消費。在IDE裡面可以點選關閉或者在命令列裡面使用Ctrl-C。輸出結果:

複製程式碼

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}

複製程式碼

Consumer Liveness

當組內的一個消費者消費某個分割槽的時候,這些分割槽上會有一個基於組的鎖,即一個組裡面一個消費者正在消費某個分割槽,組內的其他消費者就不能消費這個分割槽,如果這個消費者一直健康的運行當然最好,如果因為某些原因死掉,你需要把這個鎖解掉,然後把分割槽分給其他消費者。

kafka的組協調機制使用了心跳機制來解決這個問題。每次重新平衡分割槽分配後,組內消費者開始向組協調員(某個broker)傳送心跳。組協調員持續收到某個消費者的心跳,它就認為這個消費者是健康的。協調員每次收到心跳,都會啟動一個計時器。當計時器到時間後還沒有收到後面的心跳,就認為這個消費者已經掛掉了,就會把這個分割槽分配給其他合適的消費者。計時器的持續時間是被稱為會話超時,由客戶端的session.timeout.ms配置。

props.put("session.timeout.ms", "60000");

會話超時機制能保證當消費者掛掉或者網路故障的時候,分割槽的鎖會被釋放,並分配給其他消費。老的消費者再發送心跳也不認為它是健康的。

心跳傳送執行緒和poll執行緒是一起的,正常poll資料的時候才會傳送心跳,否則不會發。

會話超時時間預設是30秒,在網路延時大的叢集中可以適當調大這個引數,避免非異常情況下的重新分配分割槽。

Delivery Semantics

當一個組剛建立時,它的初始化offset是根據 auto.offset.reset 這個配置屬性來獲取的(在0.8中就加入了這個配置項)。一旦消費者開始消費,它根據應用的需求來提交offset。每次組內重新平衡partition以後,讀offset的位置就是上一次最後提交的offset。如果一個應用成功處理了某條訊息,但是在成功提交offset之前就崩潰掉了,那麼下一個消費者將重新讀這條訊息,造成重複讀。當然,offset的提交頻率越快,這種損失就越小。

當我們將 enable.auto.commit 屬性設定為true時(預設為true),消費者會在配置屬性 auto.commit.interval.ms 的時間間隔後自動提交offset。時間間隔越小,崩潰造成的損失越小,隨之影響效能。

如果要自己手動控制offset的提交,則必須將 auto.offset.reset 設定為false。

手動提交offset的API現在還是測試版,但是重要的是如果將它整合到poll迴圈中。下面程式碼是一個例子:

複製程式碼

try {
  while (running) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
      System.out.println(record.offset() + ": " + record.value());

    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
      // application specific failure handling
    }
  }
} finally {
  consumer.close();
}

複製程式碼

上例中使用了commitSync API來提交,它會在成功返回或者遇到錯誤之前阻塞。你需要關心的主要錯誤就是訊息處理的時間超過session的時間造成超時。當這種事情真正發生的時候,這個消費者會被踢出去,然後造成CommitFailedException異常。應用應該處理這種異常,在上次成功提交offset之後和失敗提交offset之後的訊息造成的改變進行回滾。

另外你應該保證必須在訊息成功處理後再提交offset。