1. 程式人生 > >分散式釋出訂閱訊息系統 Kafka 架構設計

分散式釋出訂閱訊息系統 Kafka 架構設計

底層API

class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

底層API不但用於實現高層API,而且還直接用於我們的離線使用者(比如Hadoop這個使用者),這些使用者還對狀態的維護有比較特定的需求。

高層API

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

該API的中心是一個由KafkaStream這個類實現的迭代器(iterator)。每個KafkaStream都代表著一個從一個或多個分割槽到一個或多個伺服器的訊息流。每個流都是使用單個執行緒進行處理的,所以,該API的使用者在該API的建立呼叫中可以提供所需的任意個數的流。這樣,一個流可能會代表多個伺服器分割槽的合併(同處理執行緒的數目相同),但每個分割槽只會把資料傳送給一個流中。

createMessageStreams方法為使用者註冊到相應的話題之上,這將導致需要對使用者/代理的分配情況進行重新平衡。為了將重新平衡操作減少到最小。該API鼓勵在一次呼叫中就建立多個話題流。createMessageStreamsByFilter方法為發現同其過濾條件想匹配的話題(額外地)註冊了多個監視器(watchers)。應該注意,createMessageStreamsByFilter方法所返回的每個流都可能會對多個話題進行迭代(比如,在滿足過濾條件的話題有多個的情況下)。