1. 程式人生 > >Kafka_生產者、消費者、流API

Kafka_生產者、消費者、流API

  • 消費者客戶端API

    Kafka消費者不是執行緒安全的。所有網路I/O都發生在進行呼叫應用程式的執行緒中。使用者的責任是確保多執行緒訪問正確同步。

    public class KafkaConsumer<K,V> extends Object implements Consumer<K,V>
    
    • 1

    消費者TCP長連線到broker來拉取訊息。故障導致的消費者關閉失敗,將會洩露這些連線,消費者不是執行緒安全的

    ① 偏移量和消費者的位置 kafka為分割槽中的每條訊息儲存一個偏移量(offset),這個偏移量是該分割槽中一條訊息的唯一標示符。也表示消費者在分割槽的位置:

    消費者的位置給出了下一條記錄的偏移量。它在每次消費者在呼叫poll(long)中接收訊息時自動增長。 “已提交”的位置是已安全儲存的最後偏移量,如果程序失敗或重新啟動時,消費者將恢復到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過呼叫commit API來手動的控制。

    ② 消費者組和主題訂閱 Kafka的消費者組概念,通過程序池瓜分消費和處理訊息。這些程序可以在同一臺機器執行,也可分佈到多臺機器上,增加可擴充套件性和容錯性,相同group.id的消費者將視為同一個消費者組。

    分組中的每個消費者通過subscribe API動態的訂閱一個topic列表。kafka將topic中的訊息傳送到每個消費者組中。並通過平衡分割槽在消費者分組中所有成員之間來達到平均。因此每個分割槽恰好地分配1個消費者(一個消費者組中)。所以如果一個topic有4個分割槽,並且一個消費者分組有2個消費者。那麼每個消費者消費2個分割槽。

    消費者組的成員是動態維護的:如果一個消費者故障。分配給它的分割槽將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現有消費者中移一個給它,這被稱為重新平衡分組。 當新分割槽新增到訂閱的topic時,或者當建立與訂閱的正則表示式匹配的新topic時,也將重新平衡。將通過定時重新整理自動發現新的分割槽,並將其分配給分組的成員。

    當分組重新分配自動發生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應用程式級邏輯,例如狀態清除,手動偏移提交等。

    它也允許消費者通過使用assign(Collection)手動分配指定分割槽,如果使用手動指定分配分割槽,那麼動態分割槽分配和協調消費者組將失效。

    ③ 發現消費者故障 訂閱一組topic後,當呼叫poll(long)時,消費者將自動加入到組中。只要持續的呼叫poll,消費者將一直保持可用,並繼續從分配的分割槽中接收訊息。此外,消費者向伺服器定時傳送心跳。 如果消費者崩潰或無法在session.timeout.ms配置的時間內傳送心跳,則消費者將被視為死亡,其分割槽將被重新分配。

    還有一種可能,消費可能遇到“活鎖”的情況,它持續的傳送心跳,但是沒有處理。為了預防消費者在這種情況下一直持有分割槽,我們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,如果你呼叫的poll的頻率大於最大間隔,則客戶端將主動地離開組,以便其他消費者接管該分割槽。 發生這種情況時,你會看到offset提交失敗(呼叫commitSync()引發的CommitFailedException)。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,必須持續呼叫poll。

    消費者提供兩個配置設定來控制poll迴圈: max.poll.interval.ms:增大poll的間隔,可以為消費者提供更多的時間去處理返回的訊息(呼叫poll(long)返回的訊息,通常返回的訊息都是一批)。缺點是此值越大將會延遲組重新平衡。 max.poll.records:此設定限制每次呼叫poll返回的訊息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔

    對於訊息處理時間不可預測的情況,這些選項是不夠的。 處理這種情況的推薦方法是將訊息處理移到另一個執行緒中,讓消費者繼續呼叫poll。 但是必須注意確保已提交的offset不超過實際位置。另外,必須禁用自動提交,並只有在執行緒完成處理後才為記錄手動提交偏移量(取決於你) 【示例】 這是個【自動提交偏移量】的簡單的kafka消費者API。

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records){
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    設定enable.auto.commit,偏移量由auto.commit.interval.ms控制自動提交的頻率。

    叢集是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發現叢集中的其餘的borker(最好指定多個,萬一有伺服器故障)。

    broker通過心跳機制自動檢測test組中失敗的程序,消費者會自動ping叢集,告訴進群它還活著。只要消費者能夠做到這一點,它就被認為是活著的,並保留分配給它分割槽的權利,如果它停止心跳的時間超過session.timeout.ms,那麼就會認為是故障的,它的分割槽將被分配到別的程序。

    這個deserializer設定如何把byte轉成object型別,例子中,通過指定string解析器,告訴獲取到的訊息的key和value只是簡單個string型別。

    【手動控制偏移量】 當訊息認為已消費過了,這個時候再去提交它們的偏移量。這個很有用的,當消費的訊息結合了一些處理邏輯,這個訊息就不應該認為是已經消費的,直到它完成了整個處理。

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在這個例子中,我們將消費一批訊息並將它們儲存在記憶體中。當我們積累足夠多的訊息後,我們再將它們批量插入到資料庫中。如果我們設定offset自動提交,消費將被認為是已消費的。這樣會出現問題,我們的程序可能在批處理記錄之後,但在它們被插入到資料庫之前失敗了。

    為了避免這種情況,我們將在相應的記錄插入資料庫之後再手動提交偏移量。這樣我們可以準確控制訊息是成功消費的。提出一個相反的可能性:在插入資料庫之後,但是在提交之前,這個過程可能會失敗(即使這可能只是幾毫秒,這是一種可能性)。在這種情況下,程序將獲取到已提交的偏移量,並會重複插入的最後一批資料。這種方式就是所謂的“至少一次”保證,在故障情況下,可以重複。 使用手動偏移控制的優點是,可以直接控制記錄何時被視為“已消耗”。

    上面的例子使用commitSync表示所有收到的訊息為”已提交”,在某些情況下,可能希望更精細的控制,通過指定一個明確訊息的偏移量為“已提交”。在下面,我們處理完每個分割槽中的訊息後,提交偏移量。

    try {
        while(running) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ": " + record.value());
                }
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}
        }
     } finally {
       consumer.close();
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意:已提交的offset應始終是你的程式將讀取的下一條訊息的offset。因此,呼叫commitSync(offsets)時,你應該加1個到最後處理的訊息的offset。

    ④ 訂閱指定的分割槽 如果消費者程序本身具有高可用性,並且如果它失敗,會自動重新啟動(可能使用叢集管理框架如YARN,Mesos,或者AWS設施,或作為一個流處理框架的一部分)。 在這種情況下,不需要Kafka檢測故障,重新分配分割槽,因為消費者程序將在另一臺機器上重新啟動。 要使用此模式,只需呼叫assign(Collection)消費指定的分割槽即可:

    String topic = "foo";
    TopicPartition partition0 = new TopicPartition(topic, 0);
    TopicPartition partition1 = new TopicPartition(topic, 1);
    consumer.assign(Arrays.asList(partition0, partition1));
    
    • 1
    • 2
    • 3
    • 4

    一旦手動分配分割槽,可以在迴圈中呼叫poll。消費者分組仍需要提交offset,只是現在分割槽的設定只能通過呼叫assign修改,因為手動分配不會進行分組協調,因此消費者故障不會引發分割槽重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交衝突,通常需要確認每一個consumer例項的gorupId都是唯一的。 注意,手動分配分割槽(即,assgin)和動態分割槽分配的訂閱topic模式(即,subcribe)不能混合使用。

    ⑤ offset儲存在其他地方 消費者可以不使用kafka內建的offset倉庫。可以選擇自己來儲存offset。要注意的是,將消費的offset和結果儲存在同一系統中,用原子的方式儲存結果和offset,但這不能保證原子,要想消費是完全原子的,並提供的“正好一次”的消費保證比kafka預設的“至少一次”的語義要更高。需要使用kafka的offset提交功能。

    如果消費的結果儲存在關係資料庫中,儲存在資料庫的offset,讓提交結果和offset在單個事務中。這樣,事務成功,則offset儲存和更新。如果offset沒有儲存,那麼偏移量也不會被更新。 如果offset和消費結果儲存在本地倉庫。例如,可以通過訂閱一個指定的分割槽並將offset和索引資料一起儲存來構建一個搜尋索引。

    每個訊息都有自己的offset,所以要管理自己的偏移,只需要做到以下幾點: 配置 enable.auto.commit=false 使用提供的 ConsumerRecord 來儲存你的位置。 在重啟時用 seek(TopicPartition, long) 恢復消費者的位置。

    當分割槽分配也是手動完成的(像上文搜尋索引的情況),這種型別的使用是最簡單的。 如果分割槽分配是自動完成的,需要特別小心處理分割槽分配變更的情況。可以通過呼叫subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener例項來完成的。例如,當分割槽向消費者獲取時,消費者將通過實現ConsumerRebalanceListener.onPartitionsRevoked(Collection)給這些分割槽提交它們的offset。當分割槽分配給消費者時,消費者通過ConsumerRebalanceListener.onPartitionsAssigned(Collection)為新的分割槽正確地將消費者初始化到該位置。 ConsumerRebalanceListener的另一個常見用法是清除應用已移動到其他位置的分割槽的快取。

    ⑥ 控制消費的位置 大多數情況下,消費者只是簡單的從頭到尾的消費訊息,週期性的提交位置(自動或手動)。kafka也支援消費者去手動的控制消費的位置,可以消費之前的訊息也可以跳過最近的訊息。

    有幾種情況,手動控制消費者的位置可能是有用的。 一種場景是對於時間敏感的消費者處理程式,對足夠落後的消費者,直接跳過,從最近的訊息開始消費。 另一個使用場景是本地狀態儲存系統。在這樣的系統中,消費者將要在啟動時初始化它的位置(無論本地儲存是否包含)。同樣,如果本地狀態已被破壞(假設因為磁碟丟失),則可以通過重新消費所有資料並重新建立狀態。 kafka使用seek(TopicPartition, long)指定新的消費位置。用於查詢伺服器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。

    ⑦ 消費者流量控制 如果消費者分配了多個分割槽,並同時消費所有的分割槽,這些分割槽具有相同的優先順序。在一些情況下,消費者需要首先消費一些指定的分割槽,當指定的分割槽有少量或者已經沒有可消費的資料時,則開始消費其他分割槽。 例如流處理,當處理器從2個topic獲取訊息並把這兩個topic的訊息合併,當其中一個topic長時間落後另一個,則暫停消費,以便落後的趕上來。 kafka支援動態控制消費流量,分別在future的poll(long)中使用pause(Collection) 和 resume(Collection) 來暫停消費指定分配的分割槽,重新開始消費指定暫停的分割槽。

    ⑧ 多執行緒處理 Kafka消費者不是執行緒安全的。所有網路I/O都發生在進行呼叫應用程式的執行緒中。使用者的責任是確保多執行緒訪問正確同步。非同步訪問將導致ConcurrentModificationException。 此規則唯一的例外是wakeup(),它可以安全地從外部執行緒來中斷活動操作。在這種情況下,將從操作的執行緒阻塞並丟擲一個WakeupException。這可用於從其他執行緒來關閉消費者。 以下程式碼段顯示了典型模式:

    public class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("topic"));
                while (!closed.get()) {
                    ConsumerRecords records = consumer.poll(10000);
                    // Handle new records
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) throw e;
            } finally {
                consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在單獨的執行緒中,可以通過設定關閉標誌和喚醒消費者來關閉消費者。

    closed.set(true);
    consumer.wakeup();
    
    • 1
    • 2

    我們沒有多執行緒模型的例子。但留下幾個操作可用來實現多執行緒處理訊息。

    每個執行緒一個消費者,這種方法的優點和缺點: 這是最容易實現的;因為它不需要線上程之間協調,所以通常它是最快的;它按順序處理每個分割槽(每個執行緒只處理它接受的訊息)。更多的消費者意味著更多的TCP連線到叢集(每個執行緒一個)。一般kafka處理連線非常的快,所以這是一個小成本;更多的消費者意味著更多的請求被髮送到伺服器,但稍微較少的資料批次可能導致I/O吞吐量的一些下降;所有程序中的執行緒總數受到分割槽總數的限制。

    解耦消費和處理 另一個替代方式是一個或多個消費者執行緒,它來消費所有資料,其消費所有資料並將ConsumerRecords例項切換到由實際處理記錄的處理器執行緒池來消費的阻塞佇列。這個選項同樣有利弊: 可擴充套件消費者和處理程序的數量。這樣單個消費者的資料可分給多個處理器執行緒來執行,避免對分割槽的任何限制;跨多個處理器的順序保證需要特別注意,因為執行緒是獨立的執行,後來的訊息可能比早到的訊息先處理,如果對排序沒有問題,這就不是個問題;手動提交變得更困難,因為它需要協調所有的執行緒以確保對該分割槽的處理完成。 這種方法有多種玩法,例如,每個處理執行緒可以有自己的佇列,消費者執行緒可以使用TopicPartition hash到這些佇列中,以確保按順序消費,並且提交也將簡化。