1. 程式人生 > >Kafka#2:訊息佇列




  • 訊息協議
  • 訊息訂閱
  • 訊息儲存
  • 訊息投遞
  • 訊息順序
  • 訊息清理
  • 訊息優先順序
  • 訊息過濾
  • 訊息堆積
  • 事務訊息?


## Request Header
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|                       REQUEST_LENGTH                          |
|         REQUEST_TYPE          |        TOPIC_LENGTH           |
/                                                               /
/                    TOPIC (variable length)                    /
|                           PARTITION                           |
## Multi-Request Header
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|                       REQUEST_LENGTH                          |
|         REQUEST_TYPE          |    TOPICPARTITION_COUNT       |
## Message
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|                             LENGTH                            |
|     MAGIC       |  COMPRESSION  |           CHECKSUM          |
|      CHECKSUM (cont.)           |           PAYLOAD           /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+                             /
/                         PAYLOAD (variable length)             /

## LENGTH = int32 // Length in bytes of entire message (excluding this field)
## MAGIC = int8   // 0 = COMPRESSION attribute byte does not exist (v0.6 and below)
                  // 1 = COMPRESSION attribute byte exists (v0.7 and above)
## COMPRESSION = int8 // 0 = none; 1 = gzip; 2 = snappy;
                      // Only exists at all if MAGIC == 1
## CHECKSUM = int32   // CRC32 checksum of the PAYLOAD
## PAYLOAD = Bytes[]  // Message content
## Produce Request
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/                         REQUEST HEADER                        /
/                                                               /
|                         MESSAGES_LENGTH                       |
/                                                               /
/                            MESSAGES                           /
## Multi-Produce Request
 /                   MULTI-REQUEST HEADER                        /
 |                   TOPIC-PARTION/MESSAGES (n times)             |
 ## Per Topic-Partition (repeated n times)
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 |        TOPIC_LENGTH           |  TOPIC (variable length)      /
 |                           PARTITION                           |
 |                         MESSAGES_LENGTH                       |
 /                            MESSAGES                           /
## Fetch Request
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/                         REQUEST HEADER                        /
/                                                               /
|                             OFFSET                            |
|                                                               |
|                            MAX_SIZE                           |

## OFFSET   = int64 // Offset in topic and partition to start from
## Multi-Fetch Request
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 /                   MULTI-REQUEST HEADER                        /
 |             TOPIC-PARTION-FETCH-REQUEST  (n times )           |

 ## Per Topic-Partition-Fetch- Request (repeated n times)
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 |        TOPIC_LENGTH           |  TOPIC (variable length)      /
 |                           PARTITION                           |
 |                         OFFSET                                |
 /                            MAX_SIZE                           /


因為使用了pull方式,broker不需要維護訂閱關係,Kafka將訂閱關係儲存到ZK上了,參考Consumer registration

## /consumers/[groupId]/ids/[consumerId] -> 

{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "pattern", "type": "string", "doc": "can be of static, white_list or black_list"},
      {"name": "subscription", "type" : {"type": "map", "values": {"type": "int"},
                                         "doc": "a map from a topic or a wildcard pattern to the number of streams"}      }    ]
A static subscription:
  "version": 1,
  "pattern": "static",
  "subscription": {"topic1": 1, "topic2": 2}

A whitelist subscription:
  "version": 1,
  "pattern": "white_list",
  "subscription": {"abc": 1}

A blacklist subscription:
  "version": 1,
  "pattern": "black_list",
  "subscription": {"abc": 1}
## /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

最近有官方文件給出如下說明要將consumer的offset(也就是上面的/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset))從ZK遷移到Kafka,

ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count).


Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.


通過之前的QuickStart已經看到,物理上,每一個topic會有幾個partition目錄,partition下面的就是message檔案,稱為log檔案,log檔案是以它所儲存的第一條message的offset命名。每一條message就是一個log entry,log entry的格式如下,

## On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
magic value    : 1 byte
crc            : 4 bytes
payload        : n bytes


 * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
 * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
 * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
 * any previous segment.
 * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. 
 * @param log The message set containing log entries
 * @param index The offset index
 * @param baseOffset A lower bound on the offsets in this segment
 * @param indexIntervalBytes The approximate number of bytes between entries in the index
 * @param time The time instance


pull or push

The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.


  • 消費速度控制。push方式,是由broker來控制訊息傳輸速度,當consumer的消費能力低於broker的傳輸速度時,這樣有可能會拖垮consumer(a denial of service attack, in essence)。而採用pull方式,consumer完全可以等自己的消費能力上來之後再catch up。
  • 消費數量控制。push方式要嘛一次傳送一條message,要嘛等到一定量的message一次性發送。前者太浪費,後者延遲高而且也會增加broker設計的複雜性。而採用pull方式,consumer完全可以根據自身情況看一次要拉多少資料。

用pull方式,需要解決輪詢效率的問題。Kafka是如何處理這個問題的呢?RTFSC,參考官方給出的Consumer Example,跟程式碼進去,傳送fetch請求是在ConsumerFetcherThreadConsumerFetcherThread extends AbstractFetcherThread extends ShutdownableThread extends java.lang.Thread,看下

override def run(): Unit = {
    info("Starting ")
      // 直接while迴圈一直髮送請求
    } catch{
      case e: Throwable =>
          error("Error due to ", e)
    info("Stopped ")
  override def doWork() {
    inLock(partitionMapLock) {

      // partitionMap最初由#addPartitions方法寫入
      // 除非呼叫#removePartitions不然partitionMap是不會為空的
      if (partitionMap.isEmpty)
        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)

      partitionMap.foreach {
        case((topicAndPartition, offset)) =>
          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                           offset, fetchSize)

    val fetchRequest = fetchRequestBuilder.build()
    if (!fetchRequest.requestInfo.isEmpty)

      // 真正向broker請求message

如此設計,當broker沒有新的message時豈不效率很低?別慌,Kafka還是實現了long polling機制的,

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).


case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
                                        override val correlationId: Int = FetchRequest.DefaultCorrelationId,
                                        clientId: String = ConsumerConfig.DefaultClientId,
                                        replicaId: Int = Request.OrdinaryConsumerId,
                                        maxWait: Int = FetchRequest.DefaultMaxWait,
                                        minBytes: Int = FetchRequest.DefaultMinBytes,
                                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])

load balance

還有另一個非常重要的點要說明下,Each broker partition is consumed by a single consumer within a given consumer group,也就是說同一個group下每一個partition只有一個確定的consumer,稱之為owner。如果consumer的數量比partition來得多,那麼某些consumer將不會消費到任何message。所以很顯然,Kafka是無法支援多播的。owner被記錄在ZK上,/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] -> consumer_node_id。那麼owner是怎麼分配的?

1. For each topic T that Ci subscribes to 
2.   let PT be all partitions producing topic T
3.   let CG be all consumers in the same group as Ci that consume topic T
4.   sort PT (so partitions on the same broker are clustered together)
5.   sort CG
6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci // i*N to i*N+N-1
8.   remove current entries owned by Ci from the partition owner registry
9.   add newly assigned partitions to the partition owner registry
     (we may need to re-try this until the original partition owner releases its ownership)


let PT=[0, 1, 2, 3, 4, 5, 6, 7];
let CG=[0, 1, 2, 3];
then N=PT.len/CG.len=2;
then C0=>[0,1], C1=>[2,3], C2=>[4,5], C3=>[6,7];
          for (consumerThreadId <- consumerThreadIdSet) {
            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
            assert(myConsumerPosition >= 0)
            val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

             *   Range-partition the sorted partitions to consumers for better locality.
             *  The first few consumers pick up an extra partition, if any.
            if (nParts <= 0)
              warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
            else {
              for (i <- startPart until startPart + nParts) {
                val partition = curPartitions(i)
                info(consumerThreadId + " attempting to claim partition " + partition)

                // 每個partition只會由一條thread消費,不會有併發的問題
                addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)

                // record the partition ownership decision
                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)


  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
                                            topicCountMap: Map[String,  Int]) = {
    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
    for ((topic, nConsumers) <- topicCountMap) {
      val consumerSet = new mutable.HashSet[String]
      assert(nConsumers >= 1)
      for (i <- 0 until nConsumers)
        consumerSet += consumerIdString + "-" + i
      consumerThreadIdsPerTopicMap.put(topic, consumerSet)

更進一步,consumer只從分配給自己的partition的leader broker那裡pull message,程式碼是在ConsumerFetcherManager

  def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
    leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")

    inLock(lock) {
      partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
      this.cluster = cluster
      noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
  private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
    // thread responsible for adding the fetcher to the right broker when leader is available
    override def doWork() {
      val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
      try {
        while (noLeaderPartitionSet.isEmpty) {
          trace("No partition for leader election.")

        trace("Partitions without leader %s".format(noLeaderPartitionSet))
        val brokers = getAllBrokersInCluster(zkClient)
        val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
        if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
        topicsMetadata.foreach { tmd =>
          val topic = tmd.topic
          tmd.partitionsMetadata.foreach { pmd =>
            val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
            if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
              val leaderBroker = pmd.leader.get
              leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
              noLeaderPartitionSet -= topicAndPartition
      } catch {
        case t: Throwable => {
            if (!isRunning.get())
              throw t /* If this thread is stopped, propagate this exception to kill the thread. */
              warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
      } finally {

      try {
          case (topicAndPartition, broker) =>
            topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
      } catch {
        case t: Throwable => {
          if (!isRunning.get())
            throw t /* If this thread is stopped, propagate this exception to kill the thread. */
          else {
            warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
            noLeaderPartitionSet ++= leaderForPartitionsMap.keySet


不只是consumer,producer也是隻將message傳送給partition的leader broker。程式碼在DefaultEventHandler#partitionAndCollate

  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
    try {
      for (message <- messages) {
        val topicPartitionsList = getPartitionListForTopic(message)

        // 根據message找出partition
        val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)

        // 根據partition找出leader
        val brokerPartition = topicPartitionsList(partitionIndex)

        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
        val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

        var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
        ret.get(leaderBrokerId) match {
          case Some(element) =>
            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
          case None =>
            dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
            ret.put(leaderBrokerId, dataPerBroker)

        val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
        var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
        dataPerBroker.get(topicAndPartition) match {
          case Some(element) =>
            dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
          case None =>
            dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
    }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
      case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
      case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None


   * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
   * the value of partition is not between 0 and numPartitions-1
   * @param topic The topic
   * @param key the partition key
   * @param topicPartitionList the list of available partitions
   * @return the partition id
  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) { // 可以支援沒有message key
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
          case None =>
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
      } else // 使用Partitioner
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))


 * A topic, key, and value.
 * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  def partitionKey = {
    if(partKey != null)
    else if(hasKey)
  def hasKey = key != null


Clearly there are multiple possible message delivery guarantees that could be provided:

  • At most once—Messages may be lost but are never redelivered.
  • At least once—Messages are never lost but may be redelivered.
  • Exactly once—this is what people actually want, each message is delivered once and only once.

It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message.


排除上面這種極端情況,要解決傳送訊息Exactly once還是相對容易的。Kafka提供了request.required.acksproducer.type這樣的配置來讓使用者根據應用情況靈活選擇。一旦收到broker的ack,那麼這條message就是committed的,只要持有該partition的所有ISR沒有全掛,那麼這條message對consumer來說就是可見的。request.required.acks的含義如下,

  • 0,不需要等待broker的ack。低延遲,但是可靠性無法保證。
  • 1,只需要收到leader的ack即可(需要持久化到硬碟?)。
  • -1,需要所有的ISR的ack。
   * Common functionality for the public send methods
  private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {
    lock synchronized {

      var response: Receive = null
      try {
          // 阻塞式的
          response = blockingChannel.receive()
          trace("Skipping reading response")
      } catch {
        case e: java.io.IOException =>
          // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
          throw e
        case e: Throwable => throw e

producer.type其實與訊息語義無關,只是出於效能考慮。This parameter specifies whether the messages are sent asynchronously in a background thread.Producer中實現,

   * Sends the data, partitioned by key to the topic using either the
   * synchronous or the asynchronous producer
   * @param messages the producer data object that encapsulates the topic, key and message data
  def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get)
        throw new ProducerClosedException
      sync match {
        case true => eventHandler.handle(messages)
        case false => asyncSend(messages)

從consumer的角度來看這個問題。本質上來看,這其實是一個事務的問題,你有兩步操作,消費訊息和記錄消費進度。事務這個問題想必大家都很清楚就不多說了,那麼來看下Kafka的處理。Kafka依舊是提供了比較靈活的機制,既可以手動呼叫ConsumerConnector#commitOffsets,也可以設定auto.commit.enableauto.commit.interval.ms來使用自動commit,但正如官方ConsumerGroupExample所說的,使用auto commit,訊息有可能會重放,也就是At least once

The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.


                             |PartitionTopicInfo  |
  +----------------+   poll  | +---------------+  |  enqueue  +-----------------------+
  |ConsumerIterator| --------->| BlockingQueue |  |  <------  | ConsumerFetcherThread |
  +----------------+         | +---------------+  |           +-----------------------+
                             +--------------------+                       ||
                                                                    | KafkaServer |


Kafka只提供了a total order over messages within a partition,部分有序,當你需要完全有序時,可以通過設定該topic只有一個partition來實現。








不支援。可以通過增加messageTag的方式實現,fetch request帶上tag,broker收到請求後根據tag做下過濾後再返回給consumer。


不支援。一般事務訊息的實現需要至少給producer提供兩個介面,precommitMessage與commitMessage(兩段提交),producer的流程一般是1. precommitMessage 2. doBiz 3. commitMessage。Kafka只提供了commitMessage介面所以無法支援。




