1. 程式人生 > >Kafka使用入門教程

Kafka使用入門教程

接下來一步一步搭建Kafka執行環境。

Step 1: 下載Kafka

點選下載最新的版本並解壓.

> tar -xzf kafka_2.9.2-0.8.1.1.tgz
> cd kafka_2.9.2-0.8.1.1

Step 2: 啟動服務

Kafka用到了Zookeeper,所有首先啟動Zookper,下面簡單的啟用一個單例項的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啟動後離開控制檯。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
現在啟動Kafka:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: 建立 topic

建立一個叫做“test”的topic,它只有一個分割槽,一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以通過list命令檢視建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手動建立topic,還可以配置broker讓它自動建立topic.

Step 4:傳送訊息.

Kafka 使用一個簡單的命令列producer,從檔案中或者從標準輸入中讀取訊息併發送到服務端。預設的每條命令將傳送一條訊息。
執行producer並在控制檯中輸一些訊息,這些訊息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a messageThis is another message
ctrl+c可以退出傳送。

Step 5: 啟動consumer

Kafka also has a command line consumer that will dump out messages to standard output. Kafka也有一個命令列consumer可以讀取訊息並輸出到標準輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一個終端中執行consumer命令列,另一個終端中執行producer命令列,就可以在一個終端輸入訊息,另一個終端讀取訊息。
這兩個命令都有自己的可選引數,可以在執行的時候不加任何引數可以看到幫助資訊。

Step 6: 搭建一個多個broker的叢集

剛才只是啟動了單個broker,現在啟動有3個broker組成的叢集,這些broker節點也都是在本機上的: 首先為每個節點編寫配置檔案: > cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
在拷貝出的新檔案中新增以下引數:
config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2
broker.id在叢集中唯一的標註一個節點,因為在同一個機器上,所以必須制定不同的埠和日誌檔案,避免資料被覆蓋。 We already have Zookeeper and our single node started, so we just need to start the two new nodes: 剛才已經啟動可Zookeeper和一個節點,現在啟動另外兩個節點:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
建立一個擁有3個副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
現在我們搭建了一個叢集,怎麼知道每個節點的資訊呢?執行“"describe topics”命令就可以了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
下面解釋一下這些輸出。第一行是對所有分割槽的一個描述,然後每個分割槽都會對應一行,因為我們只有一個分割槽所以下面就只加了一行。
  • leader:負責處理訊息的讀和寫,leader是從所有節點中隨機選擇的.
  • replicas:列出了所有的副本節點,不管節點是否在服務中.
  • isr:是正在服務中的節點.
在我們的例子中,節點1是作為leader執行。

向topic傳送訊息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1my test message 2^C 
消費這些訊息:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
測試一下容錯能力.Broker 1作為leader執行,現在我們kill掉它:
> ps | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564
另外一個節點被選做了leader,node 1 不再出現在 in-sync 副本列表中:
> bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0
雖然最初負責續寫訊息的leader down掉了,但之前的訊息還是可以消費的:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
看來Kafka的容錯機制還是不錯的。
下面我們來搭建kafka的開發環境。 新增依賴 搭建開發環境需要引入kafka的jar包,一種方式是將Kafka安裝包中lib下的jar包加入到專案的classpath中,這種比較簡單了。不過我們使用另一種更加流行的方式:使用maven管理jar包依賴。 建立好maven專案後,在pom.xml中新增以下依賴:

<dependency>
        <groupId> org.apache.kafka</groupId >
        <artifactId> kafka_2.10</artifactId >
        <version> 0.8.0</ version>
</dependency>

新增依賴後你會發現有兩個jar包的依賴找不到。沒關係我都幫你想好了,點選這裡下載這兩個jar包,解壓後你有兩種選擇,第一種是使用mvn的install命令將jar包安裝到本地倉庫,另一種是直接將解壓後的資料夾拷貝到mvn本地倉庫的com資料夾下,比如我的本地倉庫是d:\mvn,完成後我的目錄結構是這樣的: 目錄結構 配置程式
首先是一個充當配置檔案作用的介面,配置了Kafka的各種連線引數:

package com.sohu.kafkademon;

public interface KafkaProperties
{
    final static String zkConnect = "10.22.10.139:2181";
    final static String groupId = "group1";
    final static String topic = "topic1";
    final static String kafkaServerURL = "10.22.10.139";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 20000;
    final static int reconnectInterval = 10000;
    final static String topic2 = "topic2";
    final static String topic3 = "topic3";
    final static String clientId = "SimpleConsumerDemoClient";
}

producer

package com.sohu.kafkademon;

import java.util.Properties;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @author leicui [email protected]
 */
public class KafkaProducer extends Thread
{
    private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public KafkaProducer(String topic)
    {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.22.10.139:9092");
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true)
        {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

consumer

package com.sohu.kafkademon;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * @author leicui [email protected]
 */
public class KafkaConsumer extends Thread
{
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic)
    {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig()
    {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

簡單的傳送接收 執行下面這個程式,就可以進行簡單的傳送接收訊息了:

package com.sohu.kafkademon;

/**
 * @author leicui [email protected]
 */
public class KafkaConsumerProducerDemo
{
    public static void main(String[] args)
    {
        KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
        producerThread.start();

        KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
        consumerThread.start();
    }
}

高級別的consumer 下面是比較負載的傳送接收的程式:

package com.sohu.kafkademon;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * @author leicui [email protected]
 */
public class KafkaConsumer extends Thread
{
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic)
    {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig()
    {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

不要畏懼檔案系統!

Kafka大量依賴檔案系統去儲存和快取訊息。對於硬碟有個傳統的觀念是硬碟總是很慢,這使很多人懷疑基於檔案系統的架構能否提供優異的效能。實際上硬碟的快慢完全取決於使用它的方式。設計良好的硬碟架構可以和記憶體一樣快。

在6塊7200轉的SATA RAID-5磁碟陣列的線性寫速度差不多是600MB/s,但是隨即寫的速度卻是100k/s,差了差不多6000倍。現代的作業系統都對次做了大量的優化,使用了 read-ahead 和 write-behind的技巧,讀取的時候成塊的預讀取資料,寫的時候將各種微小瑣碎的邏輯寫入組織合併成一次較大的物理寫入。對此的深入討論可以檢視這裡,它們發現線性的訪問磁碟,很多時候比隨機的記憶體訪問快得多。

為了提高效能,現代作業系統往往使用記憶體作為磁碟的快取,現代作業系統樂於把所有空閒記憶體用作磁碟快取,雖然這可能在快取回收和重新分配時犧牲一些效能。所有的磁碟讀寫操作都會經過這個快取,這不太可能被繞開除非直接使用I/O。所以雖然每個程式都在自己的執行緒裡只快取了一份資料,但在作業系統的快取裡還有一份,這等於存了兩份資料。

另外再來討論一下JVM,以下兩個事實是眾所周知的:

•Java物件佔用空間是非常大的,差不多是要儲存的資料的兩倍甚至更高。

•隨著堆中資料量的增加,垃圾回收回變的越來越困難。

基於以上分析,如果把資料快取在記憶體裡,因為需要儲存兩份,不得不使用兩倍的記憶體空間,Kafka基於JVM,又不得不將空間再次加倍,再加上要避免GC帶來的效能影響,在一個32G記憶體的機器上,不得不使用到28-30G的記憶體空間。並且當系統重啟的時候,又必須要將資料刷到記憶體中( 10GB 記憶體差不多要用10分鐘),就算使用冷重新整理(不是一次性刷進記憶體,而是在使用資料的時候沒有就刷到記憶體)也會導致最初的時候新能非常慢。但是使用檔案系統,即使系統重啟了,也不需要重新整理資料。使用檔案系統也簡化了維護資料一致性的邏輯。

所以與傳統的將資料快取在記憶體中然後刷到硬碟的設計不同,Kafka直接將資料寫到了檔案系統的日誌中。

常量時間的操作效率

在大多數的訊息系統中,資料持久化的機制往往是為每個cosumer提供一個B樹或者其他的隨機讀寫的資料結構。B樹當然是很棒的,但是也帶了一些代價:比如B樹的複雜度是O(log N),O(log N)通常被認為就是常量複雜度了,但對於硬碟操作來說並非如此。磁碟進行一次搜尋需要10ms,每個硬碟在同一時間只能進行一次搜尋,這樣併發處理就成了問題。雖然儲存系統使用快取進行了大量優化,但是對於樹結構的效能的觀察結果卻表明,它的效能往往隨著資料的增長而線性下降,資料增長一倍,速度就會降低一倍。

直觀的講,對於主要用於日誌處理的訊息系統,資料的持久化可以簡單的通過將資料追加到檔案中實現,讀的時候從檔案中讀就好了。這樣做的好處是讀和寫都是 O(1) 的,並且讀操作不會阻塞寫操作和其他操作。這樣帶來的效能優勢是很明顯的,因為效能和資料的大小沒有關係了。

既然可以使用幾乎沒有容量限制(相對於記憶體來說)的硬碟空間建立訊息系統,就可以在沒有效能損失的情況下提供一些一般訊息系統不具備的特性。比如,一般的訊息系統都是在訊息被消費後立即刪除,Kafka卻可以將訊息儲存一段時間(比如一星期),這給consumer提供了很好的機動性和靈活性,這點在今後的文章中會有詳述。

之前討論了consumer和producer是怎麼工作的,現在來討論一下資料傳輸方面。資料傳輸的事務定義通常有以下三種級別:

  1. 最多一次: 訊息不會被重複傳送,最多被傳輸一次,但也有可能一次不傳輸。
  2. 最少一次: 訊息不會被漏傳送,最少被傳輸一次,但也有可能被重複傳輸.
  3. 精確的一次(Exactly once): 不會漏傳輸也不會重複傳輸,每個訊息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的。

大多數訊息系統聲稱可以做到“精確的一次”,但是仔細閱讀它們的的文件可以看到裡面存在誤導,比如沒有說明當consumer或producer失敗時怎麼樣,或者當有多個consumer並行時怎麼樣,或寫入硬碟的資料丟失時又會怎麼樣。kafka的做法要更先進一些。當釋出訊息時,Kafka有一個“committed”的概念,一旦訊息被提交了,只要訊息被寫入的分割槽的所在的副本broker是活動的,資料就不會丟失。關於副本的活動的概念,下節文件會討論。現在假設broker是不會down的。
如果producer釋出訊息時發生了網路錯誤,但又不確定實在提交之前發生的還是提交之後發生的,這種情況雖然不常見,但是必須考慮進去,現在Kafka版本還沒有解決這個問題,將來的版本正在努力嘗試解決。
並不是所有的情況都需要“精確的一次”這樣高的級別,Kafka允許producer靈活的指定級別。比如producer可以指定必須等待訊息被提交的通知,或者完全的非同步傳送訊息而不等待任何通知,或者僅僅等待leader宣告它拿到了訊息(followers沒有必要)。

現在從consumer的方面考慮這個問題,所有的副本都有相同的日誌檔案和相同的offset,consumer維護自己消費的訊息的offset,如果consumer不會崩潰當然可以在記憶體中儲存這個值,當然誰也不能保證這點。如果consumer崩潰了,會有另外一個consumer接著消費訊息,它需要從一個合適的offset繼續處理。這種情況下可以有以下選擇:

  • consumer可以先讀取訊息,然後將offset寫入日誌檔案中,然後再處理訊息。這存在一種可能就是在儲存offset後還沒處理訊息就crash了,新的consumer繼續從這個offset處理,那麼就會有些訊息永遠不會被處理,這就是上面說的“最多一次”。
  • consumer可以先讀取訊息,處理訊息,最後記錄offset,當然如果在記錄offset之前就crash了,新的consumer會重複的消費一些訊息,這就是上面說的“最少一次”。
  • “精確一次”可以通過將提交分為兩個階段來解決:儲存了offset後提交一次,訊息處理成功之後再提交一次。但是還有個更簡單的做法:將訊息的offset和訊息被處理後的結果儲存在一起。比如用Hadoop ETL處理訊息時,將處理後的結果和offset同時儲存在HDFS中,這樣就能保證訊息和offser同時被處理了。

Kafka在提高效率方面做了很大努力。Kafka的一個主要使用場景是處理網站活動日誌,吞吐量是非常大的,每個頁面都會產生好多次寫操作。讀方面,假設每個訊息只被消費一次,讀的量的也是很大的,Kafka也儘量使讀的操作更輕量化。

我們之前討論了磁碟的效能問題,線性讀寫的情況下影響磁碟效能問題大約有兩個方面:太多的瑣碎的I/O操作和太多的位元組拷貝。I/O問題發生在客戶端和服務端之間,也發生在服務端內部的持久化的操作中。
訊息集(message set)
為了避免這些問題,Kafka建立了“訊息集(message set)”的概念,將訊息組織到一起,作為處理的單位。以訊息集為單位處理訊息,比以單個的訊息為單位處理,會提升不少效能。Producer把訊息集一塊傳送給服務端,而不是一條條的傳送;服務端把訊息集一次性的追加到日誌檔案中,這樣減少了瑣碎的I/O操作。consumer也可以一次性的請求一個訊息集。
另外一個性能優化是在位元組拷貝方面。在低負載的情況下這不是問題,但是在高負載的情況下它的影響還是很大的。為了避免這個問題,Kafka使用了標準的二進位制訊息格式,這個格式可以在producer,broker和producer之間共享而無需做任何改動。
zero copy
Broker維護的訊息日誌僅僅是一些目錄檔案,訊息集以固定隊的格式寫入到日誌檔案中,這個格式producer和consumer是共享的,這使得Kafka可以一個很重要的點進行優化:訊息在網路上的傳遞。現代的unix作業系統提供了高效能的將資料從頁面快取傳送到socket的系統函式,在linux中,這個函式是sendfile.
為了更好的理解sendfile的好處,我們先來看下一般將資料從檔案傳送到socket的資料流向:

  1. 作業系統把資料從檔案拷貝核心中的頁快取中
  2. 應用程式從頁快取從把資料拷貝自己的記憶體快取中
  3. 應用程式將資料寫入到核心中socket快取中
  4. 作業系統把資料從socket快取中拷貝到網絡卡介面快取,從這裡傳送到網路上。


這顯然是低效率的,有4次拷貝和2次系統呼叫。Sendfile通過直接將資料從頁面快取傳送網絡卡介面快取,避免了重複拷貝,大大的優化了效能。
在一個多consumers的場景裡,資料僅僅被拷貝到頁面快取一次而不是每次消費訊息的時候都重複的進行拷貝。這使得訊息以近乎網路頻寬的速率傳送出去。這樣在磁碟層面你幾乎看不到任何的讀操作,因為資料都是從頁面快取中直接傳送到網路上去了。
這篇文章詳細介紹了sendfile和zero-copy技術在Java方面的應用。
資料壓縮
很多時候,效能的瓶頸並非CPU或者硬碟而是網路頻寬,對於需要在資料中心之間傳送大量資料的應用更是如此。當然使用者可以在沒有Kafka支援的情況下各自壓縮自己的訊息,但是這將導致較低的壓縮率,因為相比於將訊息單獨壓縮,將大量檔案壓縮在一起才能起到最好的壓縮效果。
Kafka採用了端到端的壓縮:因為有“訊息集”的概念,客戶端的訊息可以一起被壓縮後送到服務端,並以壓縮後的格式寫入日誌檔案,以壓縮的格式傳送到consumer,訊息從producer發出到consumer拿到都被是壓縮的,只有在consumer使用的時候才被解壓縮,所以叫做“端到端的壓縮”。
Kafka支援GZIP和Snappy壓縮協議。更詳細的內容可以檢視這裡

Kafka Producer

訊息傳送

producer直接將資料傳送到broker的leader(主節點),不需要在多個節點進行分發。為了幫助producer做到這點,所有的Kafka節點都可以及時的告知:哪些節點是活動的,目標topic目標分割槽的leader在哪。這樣producer就可以直接將訊息傳送到目的地了。

客戶端控制訊息將被分發到哪個分割槽。可以通過負載均衡隨機的選擇,或者使用分割槽函式。Kafka允許使用者實現分割槽函式,指定分割槽的key,將訊息hash到不同的分割槽上(當然有需要的話,也可以覆蓋這個分割槽函式自己實現邏輯).比如如果你指定的key是user id,那麼同一個使用者傳送的訊息都被髮送到同一個分割槽上。經過分割槽之後,consumer就可以有目的的消費某個分割槽的訊息。

非同步傳送

批量傳送可以很有效的提高發送效率。Kafka producer的非同步傳送模式允許進行批量傳送,先將訊息快取在記憶體中,然後一次請求批量傳送出去。這個策略可以配置的,比如可以指定快取的訊息達到某個量的時候就發出去,或者快取了固定的時間後就傳送出去(比如100條訊息就傳送,或者每5秒傳送一次)。這種策略將大大減少服務端的I/O次數。

既然快取是在producer端進行的,那麼當producer崩潰時,這些訊息就會丟失。Kafka0.8.1的非同步傳送模式還不支援回撥,就不能在傳送出錯時進行處理。Kafka 0.9可能會增加這樣的回撥函式。見Proposed Producer API.

Kafka Consumer

Kafa consumer消費訊息時,向broker發出"fetch"請求去消費特定分割槽的訊息。consumer指定訊息在日誌中的偏移量(offset),就可以消費從這個位置開始的訊息。customer擁有了offset的控制權,可以向後回滾去重新消費之前的訊息,這是很有意義的。

推還是拉?

Kafka最初考慮的問題是,customer應該從brokes拉取訊息還是brokers將訊息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分訊息系統共同的傳統的設計:producer將訊息推送到broker,consumer從broker拉取訊息。
一些訊息系統比如Scribe和Apache Flume採用了push模式,將訊息推送到下游的consumer。這樣做有好處也有壞處:由broker決定訊息推送的速率,對於不同消費速率的consumer就不太好處理了。訊息系統都致力於讓consumer以最大的速率最快速的消費訊息,但不幸的是,push模式下,當broker推送的速率遠大於consumer消費的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統的pull模式。
Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取資料。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條訊息還是快取之後批量推送。如果為了避免consumer崩潰而採用較低的推送速率,將可能導致一次只推送較少的訊息而造成浪費。Pull模式下,consumer就可以根據自己的消費能力去決定這些策略。
Pull有個缺點是,如果broker沒有可供消費的訊息,將導致consumer不斷在迴圈中輪詢,直到新訊息到t達。為了避免這點,Kafka有個引數可以讓consumer阻塞知道新訊息到達(當然也可以阻塞知道訊息的數量達到某個特定的量這樣就可以批量傳送)。

消費狀態跟蹤

對消費訊息狀態的記錄也是很重要的。
大部分訊息系統在broker端的維護訊息被消費的記錄:一個訊息被分發到consumer後broker就馬上進行標記或者等待customer的通知後進行標記。這樣也可以在訊息在消費後立馬就刪除以減少空間佔用。
但是這樣會不會有什麼問題呢?如果一條訊息傳送出去之後就立即被標記為消費過的,一旦consumer處理訊息時失敗了(比如程式崩潰)訊息就丟失了。為了解決這個問題,很多訊息系統提供了另外一個個功能:當訊息被髮送出去之後僅僅被標記為已傳送狀態,當接到consumer已經消費成功的通知後才標記為已被消費的狀態。這雖然解決了訊息丟失的問題,但產生了新問題,首先如果consumer處理訊息成功了但是向broker傳送響應時失敗了,這條訊息將被消費兩次。第二個問題時,broker必須維護每條訊息的狀態,並且每次都要先鎖住訊息然後更改狀態然後釋放鎖。這樣麻煩又來了,且不說要維護大量的狀態資料,比如如果訊息傳送出去但沒有收到消費成功的通知,這條訊息將一直處於被鎖定的狀態,
Kafka採用了不同的策略。Topic被分成了若干分割槽,每個分割槽在同一時間只被一個consumer消費。這意味著每個分割槽被消費的訊息在日誌中的位置僅僅是一個簡單的整數:offset。這樣就很容易標記每個分割槽消費狀態就很容易了,僅僅需要一個整數而已。這樣消費狀態的跟蹤就很簡單了。
這帶來了另外一個好處:consumer可以把offset調成一個較老的值,去重新消費老的訊息。這對傳統的訊息系統來說看起來有些不可思議,但確實是非常有用的,誰規定了一條訊息只能被消費一次呢?consumer發現解析資料的程式有bug,在修改bug後再來解析一次訊息,看起來是很合理的額呀!

離線處理訊息

高階的資料持久化允許consumer每個隔一段時間批量的將資料載入到線下系統中比如Hadoop或者資料倉庫。這種情況下,Hadoop可以將載入任務分拆,拆成每個broker或每個topic或每個分割槽一個載入任務。Hadoop具有任務管理功能,當一個任務失敗了就可以重啟而不用擔心資料被重新載入,只要從上次載入的位置繼續載入訊息就可以了。

Kafka允許topic的分割槽擁有若干副本,這個數量是可以配置的,你可以為每個topci配置副本的數量。Kafka會自動在每個個副本上備份資料,所以當一個節點down掉時資料依然是可用的。

Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份資料。

建立副本的單位是topic的分割槽,每個分割槽都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分割槽的數量都比broker的數量多的多,各分割槽的leader均勻的分佈在brokers中。所有的followers都複製leader的日誌,日誌中的訊息和順序都和leader中的一致。flowers向普通的consumer那樣從leader那裡拉取訊息並儲存在自己的日誌檔案中。
許多分散式的訊息系統自動的處理失敗的請求,它們對一個節點是否
著(alive)”有著清晰的定義。Kafka判斷一個節點是否活著有兩個條件:

  1. 節點必須可以維護和ZooKeeper的連線,Zookeeper通過心跳機制檢查每個節點的連線。
  2. 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。

符合以上條件的節點準確的說應該是“同步中的(in sync)”,而不是模糊的說是“活著的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由引數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由引數replica.lag.time.max.ms決定的。 
只有當訊息被所有的副本加入到日誌中時,才算是“committed”,只有committed的訊息才會傳送給consumer,這樣就不用擔心一旦leader down掉了訊息會丟失。Producer也可以選擇是否等待訊息被提交的通知,這個是由引數request.required.acks決定的。

Kafka保證只要有一個“同步中”的節點,“committed”的訊息就不會丟失。

Leader的選擇

Kafka的核心是日誌檔案,日誌檔案在叢集中的同步是分散式資料系統最基礎的要素。

如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個訊息被提交了,但是leader down掉了,新選出的leader必須可以提供這條訊息。大部分的分散式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。

Kafaka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條訊息必須被這個集合中的每個節點讀取並追加到日誌中了,才回通知外部這個訊息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失訊息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。

一個邪惡的想法:如果所有節點都down掉了怎麼辦?Kafka對於資料不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。
實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇:

  1. 等待ISR中的任何一個節點恢復並擔任leader。
  2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader.

這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者資料都是了,那叢集就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的資料就會被作為線上資料,有可能和真實的資料有所出入,因為有些資料它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。

這種窘境不只Kafka會遇到,幾乎所有的分散式資料系統都會遇到。

副本管理

以上僅僅以一個topic一個分割槽為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分割槽.Kafka儘量的使所有分割槽均勻的分佈到叢集所有的節點上而不是集中在某些節點上,另外主從關係也儘量均衡這樣每個幾點都會擔任一定比例的分割槽的leader.

優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分割槽的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分割槽節點的主從關係。如果controller down掉了,活著的節點中的一個會備切換為新的controller.

Kafka Producer APIs

Procuder API有兩種:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它們都實現了同一個介面:

  1. class Producer {
  2. /* 將訊息傳送到指定分割槽 */
  3. publicvoid send(kafka.javaapi.producer.ProducerData<K,V> producerData);
  4. /* 批量傳送一批訊息 */
  5. publicvoid send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
  6. /* 關閉producer */
  7. publicvoid close();
  8. }

Producer API提供了以下功能:

  1. 可以將多個訊息快取到本地佇列裡,然後非同步的批量傳送到broker,可以通過引數producer.type=async做到。快取的大小可以通過一些引數指定:queue.timebatch.size。一個後臺執行緒((kafka.producer.async.ProducerSendThread)從佇列中取出資料並讓kafka.producer.EventHandler將訊息傳送到broker,也可以通過引數event.handler定製handler,在producer端處理資料的不同的階段註冊處理器,比如可以對這一過程進行日誌追蹤,或進行一些監控。只需實現kafka.producer.async.CallbackHandler介面,並在callback.handler中配置。
  2. 自己編寫Encoder來序列化訊息,只需實現下面這個介面。預設的Encoder是kafka.serializer.DefaultEncoder
    1. interface Encoder<T> {
    2. public Message toMessage(T data);
    3. }
  3. 提供了基於Zookeeper的broker自動感知能力,可以通過引數zk.connect實現。如果不使用Zookeeper,也可以使用broker.list引數指定一個靜態的brokers列表,這樣訊息將被隨機的傳送到一個broker上,一旦選中的broker失敗了,訊息傳送也就失敗了。
  4. 通過分割槽函式kafka.producer.Partitioner類對訊息分割槽
    1. interface Partitioner<T> {
    2. int partition(T key, int numPartitions);
    3. }
    分割槽函式有兩個引數:key和可用的分割槽數量,從分割槽列表中選擇一個分割槽並返回id。預設的分割槽策略是hash(key)%numPartitions.如果key是null,就隨機的選擇一個。可以通過引數partitioner.class定製分割槽函式。

KafKa Consumer APIs

Consumer API有兩個級別。低級別的和一個指定的broker保持連線,並在接收完訊息後關閉連線,這個級別是無狀態的,每次讀取訊息都帶著offset。

高級別的API隱藏了和brokers連線的細節,在不必關心服務端架構的情況下和服務端通訊。還可以自己維護消費狀態,並可以通過一些條件指定訂閱特定的topic,比如白名單黑名單或者正則表示式。

低級別的API

  1. class SimpleConsumer {
  2. /*向一個broker傳送讀取請求並得到訊息集 */
  3. public ByteBufferMessageSet fetch(FetchRequest request);
  4. /*向一個broker傳送讀取請求並得到一個相應集 */
  5. public MultiFetchResponse multifetch(List<FetchRequest> fetches);
  6. /**
  7. * 得到指定時間之前的offsets
  8. * 返回值是offsets列表,以倒序排序
  9. * @param time: 時間,毫秒,
  10. * 如果指定為OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
  11. * 如果指定為OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
  12. */
  13. publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
  14. }

低級別的API是高級別API實現的基礎,也是為了一些對維持消費狀態有特殊需求的場景,比如Hadoop consumer這樣的離線consumer。

高級別的API

  1. /* 建立連線 */
  2. ConsumerConnector connector = Consumer.create(consumerConfig);
  3. interface ConsumerConnector {
  4. /**
  5. * 這個方法可以得到一個流的列表,每個流都是MessageAndMetadata的迭代,通過MessageAndMetadata可以拿到訊息和其他的元資料(目前之後topic)
  6. * Input: a map of <topic, #streams>
  7. * Output: a map of <topic, list of message streams>
  8. */
  9. public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
  10. /**
  11. * 你也可以得到一個流的列表,它包含了符合TopicFiler的訊息的迭代,
  12. * 一個TopicFilter是一個封裝了白名單或黑名單的正則表示式。
  13. */
  14. public List<KafkaStream> createMessageStreamsByFilter(
  15. TopicFilter topicFilter, int numStreams);
  16. /* 提交目前消費到的offset */
  17. public commitOffsets()
  18. /* 關閉連線 */
  19. public shutdown()
  20. }

這個API圍繞著由KafkaStream實現的迭代器展開,每個流代表一系列從一個或多個分割槽多和broker上匯聚來的訊息,每個流由一個執行緒處理,所以客戶端可以在建立的時候通過引數指定想要幾個流。一個流是多個分割槽多個broker的合併,但是每個分割槽的訊息只會流向一個流。

每呼叫一次createMessageStreams都會將consumer註冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整。API鼓勵每次呼叫建立更多的topic流以減少這種調整。createMessageStreamsByFilter方法註冊監聽可以感知新的符合filter的tipic。

訊息格式

訊息由一個固定長度的頭部和可變長度的位元組陣列組成。頭