1. 程式人生 > >Kafka#2:訊息佇列

Kafka#2:訊息佇列

Kafka系列:

問題

  • 訊息協議
  • 訊息訂閱
  • 訊息儲存
  • 訊息投遞
  • 訊息順序
  • 訊息清理
  • 訊息優先順序
  • 訊息過濾
  • 訊息堆積
  • 事務訊息?

訊息協議

## Request Header
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       REQUEST_LENGTH                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|         REQUEST_TYPE          |        TOPIC_LENGTH           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/                                                               /
/                    TOPIC (variable length)                    /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                           PARTITION                           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
## Multi-Request Header
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       REQUEST_LENGTH                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|         REQUEST_TYPE          |    TOPICPARTITION_COUNT       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
## Message
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                             LENGTH                            |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|     MAGIC       |  COMPRESSION  |           CHECKSUM          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|      CHECKSUM (cont.)           |           PAYLOAD           /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+                             /
/                         PAYLOAD (variable length)             /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

## LENGTH = int32 // Length in bytes of entire message (excluding this field)
## MAGIC = int8   // 0 = COMPRESSION attribute byte does not exist (v0.6 and below)
                  // 1 = COMPRESSION attribute byte exists (v0.7 and above)
## COMPRESSION = int8 // 0 = none; 1 = gzip; 2 = snappy;
                      // Only exists at all if MAGIC == 1
## CHECKSUM = int32   // CRC32 checksum of the PAYLOAD
## PAYLOAD = Bytes[]  // Message content
## Produce Request
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/                         REQUEST HEADER                        /
/                                                               /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         MESSAGES_LENGTH                       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/                                                               /
/                            MESSAGES                           /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
## Multi-Produce Request
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 /                   MULTI-REQUEST HEADER                        /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                   TOPIC-PARTION/MESSAGES (n times)             |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 
 ## Per Topic-Partition (repeated n times)
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |        TOPIC_LENGTH           |  TOPIC (variable length)      /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                           PARTITION                           |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                         MESSAGES_LENGTH                       |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 /                            MESSAGES                           /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
## Fetch Request
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/                         REQUEST HEADER                        /
/                                                               /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                             OFFSET                            |
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                            MAX_SIZE                           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

## OFFSET   = int64 // Offset in topic and partition to start from
## Multi-Fetch Request
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 /                   MULTI-REQUEST HEADER                        /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |             TOPIC-PARTION-FETCH-REQUEST  (n times )           |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

 ## Per Topic-Partition-Fetch- Request (repeated n times)
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |        TOPIC_LENGTH           |  TOPIC (variable length)      /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                           PARTITION                           |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                         OFFSET                                |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 /                            MAX_SIZE                           /
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

訊息訂閱

因為使用了pull方式,broker不需要維護訂閱關係,Kafka將訂閱關係儲存到ZK上了,參考Consumer registration

## /consumers/[groupId]/ids/[consumerId] -> 

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "pattern", "type": "string", "doc": "can be of static, white_list or black_list"},
      {"name": "subscription", "type" : {"type": "map", "values": {"type": "int"},
                                         "doc": "a map from a topic or a wildcard pattern to the number of streams"}      }    ]
}
 
Example:
A static subscription:
{
  "version": 1,
  "pattern": "static",
  "subscription": {"topic1": 1, "topic2": 2}
}

A whitelist subscription:
{
  "version": 1,
  "pattern": "white_list",
  "subscription": {"abc": 1}
}

A blacklist subscription:
{
  "version": 1,
  "pattern": "black_list",
  "subscription": {"abc": 1}
}
## /consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

最近有官方文件給出如下說明要將consumer的offset(也就是上面的/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset))從ZK遷移到Kafka,

ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count).

新的解決方案是將offset作為一個topic下的message儲存,

Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

訊息儲存

通過之前的QuickStart已經看到,物理上,每一個topic會有幾個partition目錄,partition下面的就是message檔案,稱為log檔案,log檔案是以它所儲存的第一條message的offset命名。每一條message就是一個log entry,log entry的格式如下,

## On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
magic value    : 1 byte
crc            : 4 bytes
payload        : n bytes

除了log檔案,目錄下還有index檔案,這又是做啥的?沒有找到官網文件,在LogSegment中找到了說明,

/**
 * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
 * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
 * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
 * any previous segment.
 * 
 * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. 
 * 
 * @param log The message set containing log entries
 * @param index The offset index
 * @param baseOffset A lower bound on the offsets in this segment
 * @param indexIntervalBytes The approximate number of bytes between entries in the index
 * @param time The time instance
 */

訊息投遞

pull or push

The Kafka consumer works by issuing "fetch" requests to the brokers leading the partitions it wants to consume. The consumer specifies its offset in the log with each request and receives back a chunk of log beginning from that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.

之所以選擇pull而不是push,官方給出兩點說明,

  • 消費速度控制。push方式,是由broker來控制訊息傳輸速度,當consumer的消費能力低於broker的傳輸速度時,這樣有可能會拖垮consumer(a denial of service attack, in essence)。而採用pull方式,consumer完全可以等自己的消費能力上來之後再catch up。
  • 消費數量控制。push方式要嘛一次傳送一條message,要嘛等到一定量的message一次性發送。前者太浪費,後者延遲高而且也會增加broker設計的複雜性。而採用pull方式,consumer完全可以根據自身情況看一次要拉多少資料。

用pull方式,需要解決輪詢效率的問題。Kafka是如何處理這個問題的呢?RTFSC,參考官方給出的Consumer Example,跟程式碼進去,傳送fetch請求是在ConsumerFetcherThreadConsumerFetcherThread extends AbstractFetcherThread extends ShutdownableThread extends java.lang.Thread,看下

override def run(): Unit = {
    info("Starting ")
    try{
      // 直接while迴圈一直髮送請求
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }
  override def doWork() {
    inLock(partitionMapLock) {

      // partitionMap最初由#addPartitions方法寫入
      // 除非呼叫#removePartitions不然partitionMap是不會為空的
      if (partitionMap.isEmpty)
        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)

      partitionMap.foreach {
        case((topicAndPartition, offset)) =>
          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                           offset, fetchSize)
      }
    }

    val fetchRequest = fetchRequestBuilder.build()
    if (!fetchRequest.requestInfo.isEmpty)

      // 真正向broker請求message
      processFetchRequest(fetchRequest)
  }

如此設計,當broker沒有新的message時豈不效率很低?別慌,Kafka還是實現了long polling機制的,

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

上述所說的引數在FetchRequest,就是maxWaitminBytes

case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentVersion,
                                        override val correlationId: Int = FetchRequest.DefaultCorrelationId,
                                        clientId: String = ConsumerConfig.DefaultClientId,
                                        replicaId: Int = Request.OrdinaryConsumerId,
                                        maxWait: Int = FetchRequest.DefaultMaxWait,
                                        minBytes: Int = FetchRequest.DefaultMinBytes,
                                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])

load balance

還有另一個非常重要的點要說明下,Each broker partition is consumed by a single consumer within a given consumer group,也就是說同一個group下每一個partition只有一個確定的consumer,稱之為owner。如果consumer的數量比partition來得多,那麼某些consumer將不會消費到任何message。所以很顯然,Kafka是無法支援多播的。owner被記錄在ZK上,/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] -> consumer_node_id。那麼owner是怎麼分配的?

1. For each topic T that Ci subscribes to 
2.   let PT be all partitions producing topic T
3.   let CG be all consumers in the same group as Ci that consume topic T
4.   sort PT (so partitions on the same broker are clustered together)
5.   sort CG
6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci // i*N to i*N+N-1
8.   remove current entries owned by Ci from the partition owner registry
9.   add newly assigned partitions to the partition owner registry
     (we may need to re-try this until the original partition owner releases its ownership)

舉個栗子,

let PT=[0, 1, 2, 3, 4, 5, 6, 7];
let CG=[0, 1, 2, 3];
then N=PT.len/CG.len=2;
then C0=>[0,1], C1=>[2,3], C2=>[4,5], C3=>[6,7];
          for (consumerThreadId <- consumerThreadIdSet) {
            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
            assert(myConsumerPosition >= 0)
            val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

            /**
             *   Range-partition the sorted partitions to consumers for better locality.
             *  The first few consumers pick up an extra partition, if any.
             */
            if (nParts <= 0)
              warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
            else {
              for (i <- startPart until startPart + nParts) {
                val partition = curPartitions(i)
                info(consumerThreadId + " attempting to claim partition " + partition)

                // 每個partition只會由一條thread消費,不會有併發的問題
                addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)

                // record the partition ownership decision
                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
              }
            }
          }

consumerThreadId生成的邏輯在TopicCount

  protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
                                            topicCountMap: Map[String,  Int]) = {
    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
    for ((topic, nConsumers) <- topicCountMap) {
      val consumerSet = new mutable.HashSet[String]
      assert(nConsumers >= 1)
      for (i <- 0 until nConsumers)
        consumerSet += consumerIdString + "-" + i
      consumerThreadIdsPerTopicMap.put(topic, consumerSet)
    }
    consumerThreadIdsPerTopicMap
  }

更進一步,consumer只從分配給自己的partition的leader broker那裡pull message,程式碼是在ConsumerFetcherManager

  def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
    leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
    leaderFinderThread.start()

    inLock(lock) {
      partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
      this.cluster = cluster
      noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
      cond.signalAll()
    }
  }
  private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
    // thread responsible for adding the fetcher to the right broker when leader is available
    override def doWork() {
      val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
      lock.lock()
      try {
        while (noLeaderPartitionSet.isEmpty) {
          trace("No partition for leader election.")
          cond.await()
        }

        trace("Partitions without leader %s".format(noLeaderPartitionSet))
        val brokers = getAllBrokersInCluster(zkClient)
        val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                            brokers,
                                                            config.clientId,
                                                            config.socketTimeoutMs,
                                                            correlationId.getAndIncrement).topicsMetadata
        if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
        topicsMetadata.foreach { tmd =>
          val topic = tmd.topic
          tmd.partitionsMetadata.foreach { pmd =>
            val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
            if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
              val leaderBroker = pmd.leader.get
              leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
              noLeaderPartitionSet -= topicAndPartition
            }
          }
        }
      } catch {
        case t: Throwable => {
            if (!isRunning.get())
              throw t /* If this thread is stopped, propagate this exception to kill the thread. */
            else
              warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
          }
      } finally {
        lock.unlock()
      }

      try {
        addFetcherForPartitions(leaderForPartitionsMap.map{
          case (topicAndPartition, broker) =>
            topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
        )
      } catch {
        case t: Throwable => {
          if (!isRunning.get())
            throw t /* If this thread is stopped, propagate this exception to kill the thread. */
          else {
            warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
            lock.lock()
            noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
            lock.unlock()
          }
        }
      }

      shutdownIdleFetcherThreads()
      Thread.sleep(config.refreshLeaderBackoffMs)
    }
  }

不只是consumer,producer也是隻將message傳送給partition的leader broker。程式碼在DefaultEventHandler#partitionAndCollate

  def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
    try {
      for (message <- messages) {
        val topicPartitionsList = getPartitionListForTopic(message)

        // 根據message找出partition
        val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)

        // 根據partition找出leader
        val brokerPartition = topicPartitionsList(partitionIndex)

        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
        val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

        var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
        ret.get(leaderBrokerId) match {
          case Some(element) =>
            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
          case None =>
            dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
            ret.put(leaderBrokerId, dataPerBroker)
        }

        val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
        var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
        dataPerBroker.get(topicAndPartition) match {
          case Some(element) =>
            dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
          case None =>
            dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
        }
        dataPerTopicPartition.append(message)
      }
      Some(ret)
    }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
      case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
      case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
    }
  }

上面還有一個之前一直沒提到的問題,就是message最後是要傳送到哪個partition,

  /**
   * Retrieves the partition id and throws an UnknownTopicOrPartitionException if
   * the value of partition is not between 0 and numPartitions-1
   * @param topic The topic
   * @param key the partition key
   * @param topicPartitionList the list of available partitions
   * @return the partition id
   */
  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) { // 可以支援沒有message key
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId
          case None =>
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else // 使用Partitioner
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
    partition
  }

這樣看來,Kafka的message都是KeyedMessage

/**
 * A topic, key, and value.
 * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
 */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")
  
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  
  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }
  
  def hasKey = key != null
}

訊息語義

Clearly there are multiple possible message delivery guarantees that could be provided:

  • At most once—Messages may be lost but are never redelivered.
  • At least once—Messages are never lost but may be redelivered.
  • Exactly once—this is what people actually want, each message is delivered once and only once.

It's worth noting that this breaks down into two problems: the durability guarantees for publishing a message and the guarantees when consuming a message.

從producer的角度來看這個問題。試想這樣一種場景,當producer傳送message給到broker並已經儲存下來,這時producer與broker之間的網路出現問題,producer沒有收到broker的ack,那麼producer如何是好?因為它不知道broker到底成功儲存message了沒有。當前版本的Kafka並沒有解決這個問題。那這個問題怎麼破?一種方案是,producer給每個請求加上一個PrimaryKey,做冪等,出現上述情況,可以拿著這個PK來問broker,這個請求處理成功了沒,或者問都不問,直接將請求重發一遍,在broker端再做冪等。

排除上面這種極端情況,要解決傳送訊息Exactly once還是相對容易的。Kafka提供了request.required.acksproducer.type這樣的配置來讓使用者根據應用情況靈活選擇。一旦收到broker的ack,那麼這條message就是committed的,只要持有該partition的所有ISR沒有全掛,那麼這條message對consumer來說就是可見的。request.required.acks的含義如下,

  • 0,不需要等待broker的ack。低延遲,但是可靠性無法保證。
  • 1,只需要收到leader的ack即可(需要持久化到硬碟?)。
  • -1,需要所有的ISR的ack。
  /**
   * Common functionality for the public send methods
   */
  private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {
    lock synchronized {
      verifyRequest(request)
      getOrMakeConnection()

      var response: Receive = null
      try {
        blockingChannel.send(request)
        if(readResponse)
          // 阻塞式的
          response = blockingChannel.receive()
        else
          trace("Skipping reading response")
      } catch {
        case e: java.io.IOException =>
          // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry
          disconnect()
          throw e
        case e: Throwable => throw e
      }
      response
    }
  }

producer.type其實與訊息語義無關,只是出於效能考慮。This parameter specifies whether the messages are sent asynchronously in a background thread.Producer中實現,

  /**
   * Sends the data, partitioned by key to the topic using either the
   * synchronous or the asynchronous producer
   * @param messages the producer data object that encapsulates the topic, key and message data
   */
  def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get)
        throw new ProducerClosedException
      recordStats(messages)
      sync match {
        case true => eventHandler.handle(messages)
        case false => asyncSend(messages)
      }
    }
  }

從consumer的角度來看這個問題。本質上來看,這其實是一個事務的問題,你有兩步操作,消費訊息和記錄消費進度。事務這個問題想必大家都很清楚就不多說了,那麼來看下Kafka的處理。Kafka依舊是提供了比較靈活的機制,既可以手動呼叫ConsumerConnector#commitOffsets,也可以設定auto.commit.enableauto.commit.interval.ms來使用自動commit,但正如官方ConsumerGroupExample所說的,使用auto commit,訊息有可能會重放,也就是At least once

The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.

發起請求的fetchedOffset以及broker返回的已消費的consumedOffset都儲存在PartitionTopicInfo,commitOffset時會將consumedOffset儲存到ZK中。下圖為訊息消費流程的簡圖。

                             +--------------------+
                             |PartitionTopicInfo  |
  +----------------+   poll  | +---------------+  |  enqueue  +-----------------------+
  |ConsumerIterator| --------->| BlockingQueue |  |  <------  | ConsumerFetcherThread |
  +----------------+         | +---------------+  |           +-----------------------+
                             +--------------------+                       ||
                                                           BlcokingChannel||
                                                                          ||
                                                                    +-------------+
                                                                    | KafkaServer |
                                                                    +-------------+

訊息順序

Kafka只提供了a total order over messages within a partition,部分有序,當你需要完全有序時,可以通過設定該topic只有一個partition來實現。

訊息清理

Kafka提供了兩種處理訊息檔案的策略,刪除或者壓縮,通過cleanup.policy或者log.cleanup.policy來配置,前者是topic粒度,後者是partition粒度,topic粒度的設定會覆蓋partition粒度的設定。cleanup的時機通過log.retention.{minutes,hours}或者log.retention.bytes這樣的配置來控制。注意,訊息的cleanup是不會管訊息是否已經被消費的。

訊息堆積

如上述,訊息是會被清理的,所以不存在訊息堆積的問題。

訊息優先順序

不支援。優先順序感覺是一個比較變態的問題。

訊息過濾

不支援。可以通過增加messageTag的方式實現,fetch request帶上tag,broker收到請求後根據tag做下過濾後再返回給consumer。

事務訊息

不支援。一般事務訊息的實現需要至少給producer提供兩個介面,precommitMessage與commitMessage(兩段提交),producer的流程一般是1. precommitMessage 2. doBiz 3. commitMessage。Kafka只提供了commitMessage介面所以無法支援。

參考資料

相關推薦

Kafka#2訊息佇列

Kafka系列: 問題 訊息協議訊息訂閱訊息儲存訊息投遞訊息順序訊息清理訊息優先順序訊息過濾訊息堆積事務訊息? 訊息協議 ## Request Header 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6

dockerzookeeper與kafka實現分散式訊息佇列

一、安裝 下載映象 docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka 通過docker-compose啟動 docker-compose.yml指令碼(zk+kafka版) vers

柯南君看大資料時代下的IT架構(2訊息佇列之RabbitMQ-基礎概念詳細介紹

在實際應用中,可能會發生消費者收到Quque中的訊息,但沒有處理完成就宕機的情況,這種情況下,就可能導致資訊丟失,為了避免這種情況發生,我們可以要求消費者在消費完訊息後傳送一個回執給RabbitMQ,RabbitMQ收到訊息回執(Message acknowledge)後,才將該訊息從Quque中移除。如果R

大型網站架構系列訊息佇列(二)(轉)

本文是大型網站架構系列:訊息佇列(二),主要分享JMS訊息服務,常用訊息中介軟體(Active MQ,Rabbit MQ,Zero MQ,Kafka)。【第二篇的內容大部分為網路資源的整理和彙總,供大家學習總結使用,最後有文章來源】 本次分享大綱 訊息佇列概述(見第一篇:大型網站架構系列:分散式訊息

PHP訊息佇列實現及應用訊息佇列概念介紹

  在網際網路專案開發者經常會遇到『給使用者群發簡訊』、『訂單系統有大量的日誌需要記錄』或者在秒殺業務的時候伺服器無法承受瞬間併發的壓力。  這種情況下,我們怎麼保證系統正常有效的執行呢? 這個時候,我們可以引入一個叫『訊息佇列』的概念來解決上面的需求。 訊息佇列的概

資料結構實現 6.2優先佇列_基於最大二叉堆實現(C++版)

資料結構實現 6.2:優先佇列_基於最大二叉堆實現(C++版) 1. 概念及基本框架 2. 基本操作程式實現 2.1 入隊操作 2.2 出隊操作 2.3 查詢操作 2.4 其他操作 3. 演算法複雜度分析

redis簡單訊息佇列-高併發-超搶/賣

一、訊息佇列 什麼是訊息佇列? 是一個訊息的 連結串列,是一個非同步處理的資料處理引擎。 用途有哪些? 郵件傳送、手機簡訊傳送,資料表單提交、圖片生成、視訊轉換、日誌儲存等。 有什麼好處? 不僅能夠提高系統的負荷,還能夠改善因網路阻塞導致的資料缺失。 有哪些軟體? ZeroMQ、Posi

Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列

在分散式遊戲伺服器系統中,訊息處理佇列主要解決問題就是解耦系統中的業務,使得每個系統看起來功能比較單一,而且解決一些全服資料共享等問題。 通常我們知道kafka是作為訊息佇列比較火的一種方式,其實還有(Active MQ,Rabbit MQ,Zero MQ)個人

Java架構之訊息佇列 (一)訊息佇列的概述

訊息佇列系列分享大綱:  一、訊息佇列的概述 二、訊息佇列之RabbitMQ的使用 三、訊息佇列之Kafka的使用 四、訊息佇列之RabbitMQ的原理詳解 五、訊息佇列之Kafka的原理詳解 六、訊息佇列之面試集錦 1.訊息佇列的概述 訊息佇列(Me

Linux系統程式設計——程序間通訊訊息佇列

概述 訊息佇列提供了一種在兩個不相關的程序之間傳遞資料的簡單高效的方法,其特點如下: 1)訊息佇列可以實現訊息的隨機查詢。訊息不一定要以先進先出的次序讀取,程式設計時可以按訊息的型別讀取。 2)訊息佇列允許一個或多個程序向它寫入或者讀取訊息。 3)與無名管道、命名管道一

效能提升五十倍訊息佇列延時聚合通知的重要性

前言 這個話題對我而言,是影響很久的事情。從第一次使用訊息佇列開始,業務背景是報名系統通知到我們的系統。正常流量下資料都能正常通知過來,但遇到匯入報名人時,採用了Task非同步通知,資料量一大,佇列就死了。當時是儘量採用同步方式,減少併發量。  後來業務上有了專門的營銷系統

轉載訊息佇列MQ

本文大概圍繞如下幾點進行闡述: 為什麼使用訊息佇列? 使用訊息佇列有什麼缺點? 訊息佇列如何選型? 如何保證訊息佇列是高可用的? 如何保證訊息不被重複消費? 如何保證消費的可靠性傳輸? 如何保證訊息的順序性? 1、為什麼要使用訊息佇列? 分析:一個用訊息佇列的人,不知道

C++ 多執行緒框架(3)訊息佇列

之前,多執行緒一些基本的東西,包括執行緒建立,互斥鎖,訊號量,我們都已經封裝,下面來看看訊息佇列 我們儘量少用系統自帶的訊息佇列(比如Linux的sys/msgqueue),那樣移植性不是很強,我們希望的訊息佇列,在訊息打包和提取都是用的標準的C++資料結構,當然,

Linux 多工程式設計——程序間通訊訊息佇列(Message Queues)

概述 訊息佇列提供了一種在兩個不相關的程序之間傳遞資料的簡單高效的方法,其特點如下: 1)訊息佇列可以實現訊息的隨機查詢。訊息不一定要以先進先出的次序讀取,程式設計時可以按訊息的型別讀取。 2)訊息佇列允許一個或多個程序向它寫入或者讀取訊息。 3)與無名管道、命名管道一

kafka java 實現訊息佇列demo

kafka是吞吐量巨大的一個訊息系統,它是用scala寫的,和普通的訊息的生產消費還有所不同,寫了個demo程式供大家參考。kafka的安裝請參考官方文件。首先我們需要新建一個maven專案,然後在pom中引用kafka jar包,引用依賴如下: <depend

uC/OS-II 學習筆記之訊息佇列

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Java 佇列2迴圈佇列

一、概述: 1、原理: 與普通佇列的區別在於迴圈佇列新增資料時,如果其有效資料end == maxSize - 1(最大空間)的話,end指標又移動到-1的位置 刪除資料時,如果head== ma

架構師基本功訊息佇列

轉:http://blog.csdn.net/leftfist/article/details/50909175 訊息佇列是啥?我覺得大家都心知肚明,已經眾所周知到不用解釋的程度。不過,但凡學習、解釋一樣東西,都應該遵循 “它是什麼?”、 “做什麼用?”、 “為啥要用它”、 “它有啥分類” 這

java 技術訊息佇列

學習過程分為三個步驟: 1 查詢資料2 實驗實踐3 歸納總結 學習思路.jpg 2 查詢資料 推薦兩篇部落格,內容不錯。 1 JMS(Java訊息服務)入門教程 ,比較適合瞭解基本知識,包括概念、分類、用途及其原理。2 訊息佇列MQ技術的介紹和原理,介紹了訊

uC/OS-II 學習筆記訊息佇列

二、訊息佇列的操作函式: (1)建立訊息佇列函式:OS_EVENT OSQCreate(void **start, INT16U size) (2)請求訊息佇列函式:void *OSQPend(OS_EVENT *pEvent, INI16U timeout, INT8U *err) (3)向訊息佇列傳送訊息