1. 程式人生 > >Kafka基礎-消費者讀取訊息

Kafka基礎-消費者讀取訊息

下文介紹如何使用Java從Kafka訂閱和讀取訊息,它和從其它訊息系統讀取訊息有點不同,涉及到一些獨特的概念。所以我們要先了解這些概念:

1. Kafka消費者概念

1.1 消費者和消費者組

當你只有一個消費者而且生產者傳送訊息的速率比消費者讀取訊息的速率要快的時候,處理新訊息就會造成延時,顯然需要配置多個消費者去讀取訊息。Kafka的消費者是消費者組的一部分,當多個消費者訂閱一個topic並屬於同一個消費者組時,該組中的每個消費者都將會接收這個topic不同分割槽的訊息。

下圖Topic T1有4個分割槽,消費者組G1只有1個消費者C1,那麼G1訂閱T1的時候,C1會從所有分割槽讀取訊息。

如果新增一個消費者C2到G1,每個消費者只會從其中2個分割槽讀取訊息。例如C1讀取分割槽0和2,C2讀取分割槽1和3,如下圖所示:

如果G1有4個消費者,那麼每個消費者只會從其中1個分割槽讀取訊息。如下圖所示:

如果消費者的數量比分割槽的數量要多,多出的消費者會處於空閒狀態而不會從任何分割槽讀取訊息。如下圖所示:

Kafka的消費者通常會執行一些高延時的操作,例如是寫資料到資料庫、對資料進行耗時的計算。在這種情況下,單個消費者不可能及時讀取新的訊息,因此新增消費者是提高讀取訊息效能的主要方法。注意,正如上述所說的,消費者的數量不能超過分割槽的數量,否則會造成資源浪費,因為多出的消費者會處於空閒狀態。另外,多個應用從同一個topic讀取訊息的情況也是非常普遍的。事實上,這也是Kafka的主要設計目標之一。與許多傳統的訊息系統不同,Kafka在不降低效能的情況下仍然能夠支援大量的消費者和消費者組。

例如下圖所示,新增一個有2個消費者的消費者組G2,那麼G2也會和G1一樣讀取T1的所有訊息。

1.2 消費者組和分割槽再均衡

當分割槽的所有權從一個消費者變為另外一個消費者稱為分割槽再均衡。當向消費者組新增一個新的消費者時,它將從之前由另外一個消費者消費的分割槽中讀取訊息。當一個消費者被停止或者發生故障時,它會被該消費者組移除,原來由它讀取訊息的分割槽會被剩餘的其中一個消費者消費。當消費者組消費的topic被修改時(例如,管理員新增新的分割槽),分割槽再均衡也會被觸發。

分割槽再均衡是非常重要的,因為它為消費者組提供了高可用性和可擴充套件性(允許我們容易地和安全地新增和移除消費者),但通常它是不希望發生的。在分割槽再均衡期間,所有消費者會暫停讀取訊息,因此分割槽再均衡基本上會造成整個消費者組的短暫停止。另外,當分割槽被重新分配給另外一個消費者時,該消費者會丟失其當前的狀態;如果它正在快取任何資料,它將需要重新整理它的快取,這會減緩訊息的讀取效能直到該消費者重新設定它的狀態。

消費者維持分配給它們的分割槽的所有權是通過向作為該組協調器的Kafka broker傳送心跳(不同消費者組的協調器可以是不同的broker)。只要消費者定期傳送心跳,它就會被認為是正常的。如果一個消費者停止傳送心跳超過一定的時間,其session會超時,該組的協調器會認為它故障並觸發分割槽再均衡。當正常停止一個消費者時,該消費者將通知組協調器它正在離開,組協調器將立即觸發分割槽再均衡。

2. 建立消費者

從Kafka讀取訊息的第一步是建立一個消費者,類似於建立生產者,也必須指定三個屬性:bootstrap.servers、key.deserializer和value.deserializer,第一個之前介紹生產者的時候有詳細說明,這裡不再重複,簡單來說就是用於與Kafka叢集建立初始連線的主機和埠的列表。第二個和第三個對應生產者的key.serializer和value.serializer,指定用於把byte陣列反序列化為Java物件的類名。

另外還有一個屬性group.id,但不是嚴格強制的,它用於指定該消費者所屬的消費者組。

下面是建立消費者的程式碼示例:

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

3. 訂閱訊息

Kafka允許消費者訂閱一個或多個topics的訊息,只需要呼叫subscribe()方法,該方法接收一個需要訂閱的topic列表:

import java.util.Collections;

consumer.subscribe(Collections.singletonList("customerCountries"));

也可以使用正則表示式匹配多個topic的名字,下面是匹配字首為test.的topics:

import java.util.regex.Pattern;

consumer.subscribe(Pattern.compile("test.*"));

注意,在建立一個新的topic時,如果消費者訂閱的topic正則表示式匹配新的topic,那麼分割槽再均衡會被立即觸發,該消費者會開始從新的topic讀取訊息。

4. Poll迴圈

消費者API的核心部分是用一個簡單的迴圈不斷地輪詢伺服器讀取新的訊息。一旦消費者向topics訂閱訊息,這個poll迴圈將會處理協調器的所有操作,例如,分割槽再均衡、讀取訊息等。從而為開發人員提供了一個簡單地從分配的分割槽讀取資料的API。以下是其主要的實現程式碼:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
            // Handle new records
        }
    }
} finally {
    consumer.close();
}

poll(long timeout)方法返回一個ConsumerRecords<K, V>,每一個數據都包含topic、分割槽資訊、訊息偏移量offset、key和訊息值value。新的消費者第一次呼叫poll()方法時,負責查詢組協調者GroupCoordinator、加入到消費者組並與分配的分割槽連線。如果資料再均衡被觸發,它也會同時做相應的處理。注意:消費者是非執行緒安全的,如果要使用多個消費者,最好使用ExecutorService去啟動多執行緒,而且每個執行緒使用不同的消費者例項。

5. 消費者配置屬性

消費者也有很多配置屬性,在0.9.0.0及之後版本,Kafka使用了新的Java消費者,替換了原來基於Scala版本的。除了上述必須的三個之外,下面是一些比較重要的屬性:

5.1 fetch.min.bytes

該配置設定broker返回資料的最小位元組數,預設是1位元組。如果一個broker接收到來自消費者的讀取訊息請求,但新訊息的大小小於該配置的值,那麼這個broker會等待更多的訊息直到接收的新訊息大小等於該配置的值。這樣的機制會減少消費者和broker的負載,因為可以減少處理來回訊息的頻率。所以,在broker接收到的訊息不多時,可以把該配置的值設大一點,這樣可以減少消費者使用的CPU資源,或者在配置了大量消費者時減少broker的負載。此配置和下面的fetch.max.wait.ms一起使用。

5.2 fetch.max.wait.ms

該配置設定broker最長的等待時間,預設是500ms。如果一個broker接收到的新訊息大小小於fetch.min.bytes的值,那麼這個broker會一直等待直到時間超過配置的500ms。如果想減少延時,可以把該配置的值設小一點。此配置和上面的fetch.min.bytes是一起使用的,例如fetch.max.wait.ms=100,fetch.min.bytes=1048576(1MB),當broker資料達到1MB或者等待了100ms時,broker都會把資料傳送給消費者。

5.3 max.partition.fetch.bytes

該配置設定每個分割槽返回資料的最大位元組數,預設是1048576位元組=1MB。如果一個topic有20個分割槽,配置了5個消費者,那麼每個消費者則需要4MB記憶體,用於存放KafkaConsumer.poll()返回的ConsumerRecord物件。實際上,如果消費者組內有消費者發生故障,由於分割槽再均衡,每個消費者則需要處理更多的分割槽訊息,那麼建議分配更多的記憶體。該配置值必須大於broker能夠接收訊息的最大大小message.max.bytes=1000012(0.96MB),否則消費者將會hang住。

設定該配置的另一個重要考慮因素是消費者處理資料所花費的時間。消費者必須頻繁地呼叫poll()方法以避免session超時導致分割槽再均衡,如果單次poll()返回的資料非常大,那麼消費者需要更長的時間去處理,這意味著它不能及時呼叫下一次的poll()方法,從而導致session的超時。這個時候可以減小該配置的值或者增加session的超時時間。

5.4 session.timeout.ms

該配置設定session超時時間,預設是10000=10秒。消費者會週期性地傳送心跳給broker,間隔為heartbeat.interval.ms=3000=3秒,如果超過session.timeout.ms配置的時間broker都沒有收到心跳,那麼對應的消費者就會被移除,然後觸發分割槽再均衡。

5.5 auto.offset.reset

該配置設定消費者在開始讀取一個沒有提交偏移量或該偏移量為非法的分割槽時如何重置該偏移量(通常是因為消費者下線時間太長,以至於那個偏移量對應的訊息已經過時,或者訊息已經被刪除)。可用的配置為:

  • earliest: 自動重置為最舊的offset,也就是消費者會從最開始讀取分割槽的所有訊息
  • latest(預設): 自動重置為最新的offset,也就是消費者會從最新的訊息開始讀取
  • none: 向消費者丟擲異常
  • 其餘值: 向消費者丟擲異常

5.6 enable.auto.commit

該配置設定是否自動週期性提交offset,預設值為true,提交間隔配置為auto.commit.interval.ms,預設為5000=5秒。

5.7 partition.assignment.strategy

該配置設定消費者例項之間分配分割槽所有權策略的類名,現有三個可用的類:

org.apache.kafka.clients.consumer.RangeAssignor(預設):對每個topic,按數字順序排列可用分割槽,按字典順序排列消費者。然後將分割槽數量除以總消費者數量,以確定分配給每個消費者的分割槽數量。分割槽是按數字順序連續分配給消費者,如果分割槽沒有被均勻分配,那麼靠前的消費者會多分配一個分割槽。例如有2個消費者C0和C1,2個topics t0和t1,每個topic都有3個分割槽,分別為t0p0, t0p1, t0p2, t1p0, t1p1和t1p2。那麼分配給C0,C1的分割槽分別是C0:[t0p0, t0p1, t1p0, t1p1],C1:[t0p2, t1p2]。

org.apache.kafka.clients.consumer.RoundRobinAssignor:列出所有可用分割槽和所有可用消費者,然後把分割槽迴圈分配給消費者。如果所有消費者訂閱的topic是一樣的,那麼分割槽會被均勻分配。例如有和上述RangeAssignor一樣的topics,分割槽和消費者,那麼分配給C0,C1的分割槽分別是C0:[t0p0, t0p2, t1p1],C1:[t0p1, t1p0, t1p2]。當所有消費者訂閱的topic不一樣時,分配的時候仍然會按照迴圈的方式,但會跳過那些沒有訂閱同一topic的消費者,這可能會導致分割槽分配不平衡。例如有3個消費者C0,C1和C2,3個topics t0,t1和t2,分別有1,2和3個分割槽。C0訂閱t0;C1訂閱t0和t1;C2訂閱t0,t1和t2。那麼分配給C0,C1和C2的分割槽分別是C0:[t0p0],C1:[t1p0]和C2:[t1p1, t2p0, t2p1, t2p2]。

org.apache.kafka.clients.consumer.StickyAssignor:這個類的實現有2個目標:

首先,保證分割槽分配儘可能均勻,意味著:

  • 分配給消費者的分割槽數量最多相差一個,或者
  • 比其它消費者少至少2個分割槽的消費者無法接收訂閱分割槽的訊息

其次,當分割槽再均衡時,保留儘可能多的現有分配。這會有助於節省當分割槽從一個消費者重新分配給另一個消費者時的資源消耗。一開始,分割槽會被儘可能均勻地分配給消費者。雖然這可能聽起來類似於RoundRobinAssignor,但下面的第二個例子表明不是。在分割槽再均衡時,它將按照以下方式執行:

  • 分配給消費者的分割槽仍然儘可能均勻地分配,並且
  • 分割槽儘可能與之前分配的消費者保持一致

當然,上面的第一個目標優先於第二個。

例1,假定有3個消費者C0,C1和C2,4個topics t0,t1,t2和t3,每個topic有2個分割槽,那麼分配給C0,C1和C2的分割槽分別是C0:[t0p0, t1p1, t3p0],C1:[t0p1, t2p0, t3p1]和C2:[t1p0, t2p1]。如果C1被移除,觸發分割槽再均衡,RoundRobinAssignor分配的策略是C0:[t0p0, t1p0, t2p0, t3p0],C2:[t0p1, t1p1, t2p1, t3p1]。而StickyAssignor分配的策略是C0:[t0p0, t1p1, t3p0, t2p0],C2:[t1p0, t2p1, t0p1, t3p1]。

例2,假定有3個消費者C0,C1和C2,3個topics t0,t1和t2,分別有1,2和3個分割槽。C0訂閱t0;C1訂閱t0和t1;C2訂閱t0,t1和t2。RoundRobinAssignor分配的策略是C0:[t0p0],C1:[t1p0]和C2:[t1p1, t2p0, t2p1, t2p2]。而StickyAssignor分配的策略是C0:[t0p0],C1:[t1p0,t1p1]和C2:[t2p0,t2p1,t2p2]。如果C0被移除,觸發分割槽再均衡,RoundRobinAssignor分配的策略是C1:[t0p0, t1p1],C2:[t1p0,t2p0,t2p1,t2p2]。而StickyAssignor分配的策略是保留5個分割槽的分配,C1:[t1p0,t1p1,t0p0],C2:[t2p0,t2p1,t2p2]。

5.8 client.id

用於標識讀取訊息的客戶端,通常用於日誌和效能指標以及配額。

5.9 max.poll.records

該配置設定單次呼叫poll()方法返回訊息的最大數量,預設是500。

5.10 receive.buffer.bytes and send.buffer.bytes

  • receive.buffer.bytes:讀取資料時使用的TCP接收緩衝區(SO_RCVBUF)的大小,預設值為65536位元組=64KB。如果設定為-1,則將使用作業系統的預設值。
  • send.buffer.bytes:傳送資料時使用的TCP傳送緩衝區(SO_SNDBUF)的大小,預設值為131072位元組=128KB。如果設定為-1,則將使用作業系統的預設值。

6. 提交Commits和Offsets偏移量

每當呼叫poll()方法時,它都會返回已經寫入Kafka但還沒有被當前消費者組讀取的訊息。Kafka的其中一個獨特的特性是它不像其它JMS佇列那樣跟蹤消費者的ACK,而是它允許消費者使用Kafka跟蹤每個分割槽的消費偏移量offset。

消費者向Kafka傳送一條包含分割槽偏移量的訊息到一個特別的topic:_consumer_offsets用於更新分割槽偏移量,這樣的操作稱為提交commit。在發生分割槽再均衡時,每個消費者有可能會被分配一些新的分割槽,這些消費者會從每個分割槽讀取到最新提交的offset開始繼續讀取訊息。如果提交的offset小於之前消費者處理的最後一條訊息的offset,那麼最後處理的offset與提交的offset之間的訊息將會被處理兩次,如下圖所示:

如果提交的offset大於之前消費者處理的最後一條訊息的offset,那麼最後處理的offset與提交的offset之間的訊息將會丟失,如下圖所示:

由此可見,管理offset對客戶端應用程式有很大影響。KafkaConsumer API提供了多種提交offset的方法:

6.1 自動提交

提交offset最容易的方法是讓消費者自動提交。如果使用了預設的enable.auto.commit=true,那麼預設每隔5秒鐘(auto.commit.interval.ms),消費者會自動提交客戶端呼叫poll()方法返回的最大offset。

6.2 同步提交

如果設定了auto.commit.offset=false,那麼只有在應用程式呼叫提交方法時才會提交offset。最簡單和最可靠的API是commitSync(),這個API會提交poll()方法返回的最新offset,並在提交成功後才返回,如果由於某種原因提交失敗則丟擲異常。注意確保在處理完當前的所有訊息後才呼叫commitSync(),否則訊息有可能會丟失。以下是示例的程式碼:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
            record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e);
    }
}

只要沒有不能恢復的異常,commitSync()方法會重試提交。

6.3 非同步提交

同步提交的一個缺點是應用程式會被一直阻塞直到有返回,這將會降低應用程式的吞吐量。可以通過減少提交的頻率來提高吞吐量,但相應會增加由於分割槽再均衡而導致的訊息被重複處理的數量。這個時候可以選用非同步提交,呼叫提交方法後不需要等待broker的返回。

consumer.commitAsync();

該方法的缺點是不會重試提交,原因是當commitAsync()方法從伺服器接收返回時,可能在之後已經有一個成功的提交。例如,當提交offset 2000的時候,消費者和broker出現臨時的通訊故障,因此broker暫時接收不了請求。同時,提交另外一個offset 3000,這個時候通訊恢復正常,如果offset 3000先被處理成功,offset 2000重試提交的話,會導致重複處理資料。

另外還有一種帶callback引數的commitAsync(OffsetCommitCallback callback),當接收到broker的返回時會被呼叫,一般用來記錄提交的異常資訊或者用於計量效能。以下是示例的程式碼:

consumer.commitAsync(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
        if (e != null)
            log.error("Commit failed for offsets {}", offsets, e);
    }
});

6.4 結合同步和非同步提交

通常情況下,偶爾的提交失敗不是一個大的問題,因為如果問題是臨時的,那麼下一次的提交是會成功的。但如果知道是在關閉消費者之前,或在分割槽再均衡發生之前的最後一次提交,則需要確保提交成功。因此,常見的做法是結合commitAsync()和commitSync()方法,以下是示例的程式碼(發生分割槽再均衡的例子會在後面介紹):

try {
    consumer.commitAsync();
} catch (CommitFailedException e) {
    log.error("commit failed", e);
} finally {
    try {
        // 關閉消費者之前提交
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

6.5 提交指定的offset

commitSync()和commitAsync()方法是提交當前批次最新的offset,如果想要在中途提交指定的offset,可以使用:

  • commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
  • commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

以下是示例程式碼:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
int count = 0;

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1, "no metadata"));
        if (count % 1000 == 0)
            // 每處理完1000條訊息提交一次
            consumer.commitAsync(currentOffsets, null);
        count++;
    }
}

7. 分割槽再均衡監聽器

當消費者新增或移除分割槽時,消費者提供了允許執行自定義程式碼的API:

subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

ConsumerRebalanceListener介面有2個方法需要實現:

  • public void onPartitionsRevoked(Collection<TopicPartition> partitions)
  • public void onPartitionsAssigned(Collection<TopicPartition> partitions)

onPartitionsRevoked方法是在分割槽再均衡開始前和消費者停止讀取訊息後被呼叫,一般用來提交offset;onPartitionsAssigned方法是在分割槽重新被分配到broker後和消費者開始讀取訊息前被呼叫,下面是使用onPartitionsRevoked()方法在丟失分割槽所有權之前提交offset的例子:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 後續會有使用onPartitionsAssigned()的例子
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
        // 提交已經處理的offset
        consumer.commitSync(currentOffsets);
    }
}

try {
    // 指定使用HandleRebalance監聽器
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(currentOffsets);
    } finally {
        consumer.close();
        System.out.println("Closed consumer and we are done");
    }
}

注意,上例在onPartitionsRevoked()方法提交的是已經處理的offset,不是當前批次的offset。

8. 讀取指定offsets的訊息

poll()方法是從每個分割槽最後提交的offset開始讀取訊息,如果想要從開頭開始讀取訊息,可以使用seekToBeginning(TopicPartition tp)方法;如果想要從結尾開始讀取新訊息,可以使用seekToEnd(TopicPartition tp)方法。

此外,也可以從指定offset開始讀取訊息,下面是使用onPartitionsAssigned()方法在重新分配分割槽後指定offset的例子:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 如果分割槽被刪除,把相關事務儲存在資料庫
        commitDBTransaction();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions)
            // 重新分配分割槽後,從資料庫獲取儲存的offset
            consumer.seek(partition, getOffsetFromDB(partition));
    }
}

// 指定使用SaveOffsetsOnRebalance監聽器
consumer.subscribe(topics, new SaveOffsetsOnRebalance());
// 呼叫poll()方法確保此消費者被新增到消費者裡面
consumer.poll(0);

// 獲取分割槽資訊並對每一個分割槽呼叫seek方法指定開始讀取訊息的offset
for (TopicPartition partition : consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));
        
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        // 處理訊息
        processRecord(record);
        // 儲存訊息
        storRecordInDB(record);
        // 儲存offsets在資料庫
        storeOffsetInDB(record.topic(), record.partition(), record.offset());
    }
    // 處理完當前批次訊息後,提交事務
    commitDBTransaction();
}

9. 退出poll迴圈

如果要退出poll()方法,不再讀取訊息,需要在另外一個執行緒裡面呼叫consumer.wakeup()方法。如果poll()方法是在主執行緒執行,那麼可以通過ShutdownHook來呼叫wakeup()方法,例如:

Runtime.getRuntime().addShutdownHook(new Thread() {
    public void run() {
        System.out.println("Starting exit...");
        consumer.wakeup();
        try {
            mainThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

注意,consumer.wakeup()是唯一可以安全地從其它執行緒呼叫的消費者方法,呼叫此方法會導致poll()方法丟擲WakeupException並退出,或者如果線上程沒有呼叫poll()方法之前呼叫wakeup(),那麼WakeupException會在下次呼叫poll()方法時丟擲。該異常不需要處理,但在退出執行緒之前,必須呼叫consumer.close()方法關閉消費者,用於提交offsets和向組協調器傳送一條消費者離開該組的訊息。該組協調器會立即觸發分割槽再均衡,而不需要等待session超時,對應的分割槽會重新分配給該組的另一個消費者。

try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 處理訊息
        // ...
    }
} catch (WakeupException e) {
    // 不需要處理WakeupException
} finally {
    // 必須呼叫close()方法
    consumer.close();
    System.out.println("Closed consumer and we are done");
}

10. 反序列化器

根據之前介紹過的,生產者需要序列號器把物件轉為byte陣列然後發給broker,相反地,消費者需要反序列化器把從broker讀取到的byte陣列轉為物件。和生產者一樣,除了預設提供的反序列化器之外還可以實現自定義的反序列化器,例如:

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;

public class CustomerDeserializer implements Deserializer<Customer> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to configure
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        // TODO 把byte[]轉為Customer物件,這裡省略
        return null;
    }

    @Override
    public void close() {
        // nothing to close
    }

}

需要注意的是使用的序列號器和反序列化器必須要一致。

11. 單個的消費者

通常情況下都會配置消費者組,每個組會配置多個消費者。但在某些情況下,如果需要簡單化,只配置了一個消費者,而它總是需要從topic的所有分割槽或特定分割槽讀取訊息。這種情況則不需要消費者組和分割槽再均衡。當確切地知道消費者應該讀取哪些分割槽時,則不需要採用訂閱模式,可以直接分配分割槽。下面是一個消費者如何分配一個topic的所有分割槽並讀取訊息的例子:

List<TopicPartition> partitions = new ArrayList<TopicPartition>();
// 讀取topic的所有分割槽
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    // 分配所有分割槽
    consumer.assign(partitions);
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
}

注意,如果新新增分割槽到這個topic,消費者是不會知道的,需要重新呼叫partitionsFor(String topic)方法才能獲取。

END O(∩_∩)O