分散式釋出訂閱訊息系統 Kafka 架構設計
阿新 • • 發佈:2019-01-31
底層API
class SimpleConsumer { /* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest request); /* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches); /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * @param time: time in millisecs, * if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available. * if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available. */ public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets); }
底層API不但用於實現高層API,而且還直接用於我們的離線使用者(比如Hadoop這個使用者),這些使用者還對狀態的維護有比較特定的需求。
高層API
/* create a connection to the cluster */ ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { /** * This method is used to get a list of KafkaStreams, which are iterators over * MessageAndMetadata objects from which you can obtain messages and their * associated metadata (currently only topic). * Input: a map of <topic, #streams> * Output: a map of <topic, list of message streams> */ public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /** * You can also obtain a list of KafkaStreams, that iterate over messages * from topics that match a TopicFilter. (A TopicFilter encapsulates a * whitelist or a blacklist which is a standard Java regex.) */ public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams); /* Commit the offsets of all messages consumed so far. */ public commitOffsets() /* Shut down the connector */ public shutdown() }
該API的中心是一個由KafkaStream這個類實現的迭代器(iterator)。每個KafkaStream都代表著一個從一個或多個分割槽到一個或多個伺服器的訊息流。每個流都是使用單個執行緒進行處理的,所以,該API的使用者在該API的建立呼叫中可以提供所需的任意個數的流。這樣,一個流可能會代表多個伺服器分割槽的合併(同處理執行緒的數目相同),但每個分割槽只會把資料傳送給一個流中。
createMessageStreams方法為使用者註冊到相應的話題之上,這將導致需要對使用者/代理的分配情況進行重新平衡。為了將重新平衡操作減少到最小。該API鼓勵在一次呼叫中就建立多個話題流。createMessageStreamsByFilter方法為發現同其過濾條件想匹配的話題(額外地)註冊了多個監視器(watchers)。應該注意,createMessageStreamsByFilter方法所返回的每個流都可能會對多個話題進行迭代(比如,在滿足過濾條件的話題有多個的情況下)。