kafka文件(3)----0.8.2-kafka API(java版本)
Apache Kafka包含新的java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了相容性,它們仍將存在一段時間。可以通過一些單獨的jar包呼叫這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。
一、Producer API
我們鼓勵所有新開發都使用新的java版本producer。這個客戶端是經過生產環境測試的,並且一般情況下會比先前的Scala客戶端要更快而且具有更多的特性。你可以通過新增對客戶端jar包的依賴來呼叫這個客戶端,如下所示,使用maven配置:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
可以通過javadoc檔案檢視如何使用producer。
二、Consumer API
在0.9.0釋出版本中,增加了新的java版本的consumer,用來替代已有的high-level的基於zookeeper的consumer,以及low-level的consumer APIs。
這個客戶端認為是beta版本。為了保證使用者獲得平穩的升級,我們會繼續維護0.8版本的consumer客戶端,此版本客戶端會在0.9版本的kafka叢集中
依然生效。下面的章節中,我們會介紹老的0.8版本的consumer APIs(包括high-level的ConusmerConnector以及low-level SimpleConsumer)以及
新的Java版本的consumer API。
1、Old High Level Consumer API
class Consumer{
/**
* Create a ConsumerConnector:建立consumer connector
*
* @param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config引數作用:需要置頂consumer的groupid以及zookeeper連線字串zookeeper.connect
*/
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}
/**
* V: type of the message: 訊息型別
* K: type of the optional key assciated with the message: 訊息攜帶的可選關鍵字型別
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
* Create a list of message streams of type T for each topic.:為每個topic建立T型別的訊息流的列表
*
* @param topicCountMap a map of (topic, #streams) pair : topic與streams的鍵值對
* @param decoder a decoder that converts from Message to T : 轉換Message到T的解碼器
* @return a map of (topic, list of KafakStream) pairs. : topic與KafkaStream列表的鍵值對
* The number of items in the list is #streams . Each stream supports
* an iterator over message/metadata pairs .:列表中專案的數量是#streams。每個stream都支援基於message/metadata 對的迭代器
*/
public <K,V> Map<String, List<KafkaStream<K,V> > >
createMessageStreams( Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams of type T for each topic, using the default decoder.為每個topic建立T型別的訊息列表。使用預設解碼器
*/
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of message streams for topics matching a wildcard.為匹配wildcard的topics建立訊息流的列表
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).指定將要訂閱的topics的TopicFilter(封裝了whitelist或者黑名單)
* @param numStreams the number of message streams to return.將要返回的流的數量
* @param keyDecoder a decoder that decodes the message key 可以解碼關鍵字key的解碼器
* @param valueDecoder a decoder that decodes the message itself 可以解碼訊息本身的解碼器
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements. 返回KafkaStream的列表。每個流都支援基於MessagesAndMetadata 元素的迭代器。
*/
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder.使用預設解碼器,為匹配wildcard的topics建立訊息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.使用預設解碼器,為匹配wildcard的topics建立訊息流列表
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all topic/partitions connected by this connector.通過connector提交所有topic/partitions的offsets
*/
public void commitOffsets();
/**
* Shut down the connector: 關閉connector
*/
public void shutdown();
}
你可以根據這個例子學習怎樣使用high level consumer api。2、Old Simple Consumer API
class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.從topis抓取訊息序列
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.指定topic 名字,topic partition,開始的位元組offset,抓取的最大位元組數
* @return a set of fetched messages
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.抓取一系列topics的metadata
*
* @param request specifies the versionId, clientId, sequence of topics.指定versionId,clientId,topics
* @return metadata for each topic in the request.返回此要求中每個topic的元素據
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.在給定的時間內返回正確偏移的列表
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* @return a [[kafka.javaapi.OffsetResponse]] object.
*/
public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer.關閉
*/
public void close();
}
對大多數應用來說, high level consumer Api已經足夠了,一些應用要求的一些特徵還沒有出現high level consumer介面(例如,
當重啟consumer時,設定初始offset)。他們可以使用low level SimpleConsumer Api。邏輯可能會有些複雜,你可以根據這個例子學習一下。
3、New Consumer API
新consumer API統一了標準,原來存在於0.8版本的high-level以及low-level consumer APIs之間差異不存在了。你可以通過使用下面maven配置方式,
指明客戶端依賴的jar包,這樣就可以使用新的consumer API。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>Examples showing how to use the consumer are given in the javadocs.
三、Streams API
在0.10.0 release版本中,我們增加了新的客戶端呼叫庫Kafka Streams,用來支援流式處理應用。Kafka Streams庫認為是
alpha版本質量的,同時它的公共呼叫APIs在將來有可能會修改。你可以像下面maven配置模式一樣,指明Kafka Streams的
依賴關係來呼叫Kafka Streams。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
在Javadocs中展示瞭如何呼叫這個庫(注意這些類都是不穩定的,表明以後的版本中可能會修改)。
相關推薦
kafka文件(3)----0.8.2-kafka API(java版本)
Apache Kafka包含新的java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了相容性,它們仍將存在一段時間。可以通過一些單獨的jar包呼叫這些客戶端,這些包的依賴性都比較小,同時老的Scala客戶端仍會存在。 一、Producer
Kafka文件(2)----0.8.2- 基本介紹(Getting Start)
來源: 說明: 原文中某些專有名詞不做翻譯: kafka topic partition consumer producer server client high-level 1、開始 1.1 介紹kafka可提供分散式、分割槽的、可備份的日誌提交服務,同時也是設計
kafka文件(4)---- 0.8.2-Configuration-配置選項翻譯
來源:http://kafka.apache.org/documentation.html#configuration 3. Configuration Kafka在配置檔案中使用key-value方式進行屬性配置。這些values可以通過檔案或者程式設計方式提
kafka文件(13)----0.10.1-Document-文件(5)-configures-consumer配置資訊
In 0.9.0.0 we introduced the new Java consumer as a replacement for the older Scala-based simple and high-level consumers. The configs f
kafka----kafka API(java版本)
spring mvc+my batis dubbo+zookeerper kafka restful redis分布式緩存 Apache Kafka包含新的Java客戶端,這些新的的客戶端將取代現存的Scala客戶端,但是為了兼容性,它們仍將存在一段時間。可以通過一些單獨的jar包調用這些客
Linux 文件管理命令語法、參數、實例全匯總(一)
rwx 界面 endif 群組 new 才有 func {} ans 命令:cat cat 命令用於連接文件並打印到標準輸出設備上。 使用權限 所有使用者 語法格式 cat [-AbeEnstTuv] [--help] [--version] fileName 參數
vs2010單文件中新增對話方塊並在對話方塊中新增屬性框(標籤框)
1.建立單文件Demo 在資源檢視Dialog中插入兩個Dialoge,Style設定為child,Border設定為chill。為兩個對話方塊分別新增類,基類為CPropertyPage,類名CP1,CP2。在P1的標頭檔案新增 #include "resource.h" 2.在類檢視中在De
根據介面文件書寫介面,並在前端呼叫介面返回顯示出資料(加下載)
---恢復內容開始--- 1.首先來看介面文件(其中一個介面): 介面的編寫: 1 /** 2 * 7.11 餘額明細查詢介面 3 * 4 * @param token 5 * @param pageNum 6
通過swagger2markup+asciidoctorj生成html和pdf文件並解決asciidoctorj生成的pdf檔案中文顯示不全問題(maven方式及java程式碼方式)
通過swagger2markup+asciidoctorj生成html和pdf文件(maven方式及java程式碼方式) 任務:通過同事的json檔案生成相應的html和pdf文件 前言 開始時swagger2markup和asciidocto
Kafka文件存儲機制那些事
kafka 方便 成對出現 讀者 開源項目 sock 位置 通過 刪除 點評一下先:kafka的存儲主要有幾個特點: 1. 多級索引(名義上是1級索引,但是這級索引依賴了文件列表,相當於文件列表是第一級索引,所以是二級索引),二級索引文件和數據文件一一對應。 相比只有1
python 修改文件內容3種方法
bak ram code param post img span clas 正則表達 一、修改原文件方式 1 def alter(file,old_str,new_str): 2 """ 3 替換文件中的字符串 4 :param f
[py]處理文件的3個方法
效果 字符 python生成器 pen mark str http 方法 lines file處理的3個方法: f和f.readlines效果一樣 # f.read() 所有行 -> 字符串 # f.readline 讀取一行 -> 字符串 # f.
kafka文件
介紹 Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計。 1、1 術語: Topic:可以理解為一個MQ訊息佇列的名字 Producers:將向Kafka topic釋出訊息的程式 Consume
【pySerial3.4官方文件】3、pySerial API
pySerial API 類 本地埠 類serial.Serial __init__(port = None,baudrate = 9600,bytesize = EIGHTBITS,parity = PARITY_NONE,stopbits = STOPBITS_ONE
UI5-文件-4.3-Controls
現在是時候構建我們的第一個小UI了,將HTML主體中的“Hello World”文字替換為SAPUI5控制元件sap.m.Text。首先,我們將使用JavaScript控制元件介面來設定UI,然後將控制元件例項放入HTML體中。 Preview The "Hello World" text
Debezium文件翻譯02:啟動Docker,Debezium,Zookeeper,Kafka
使用Docker執行Debezium 執行Debezium涉及三個主要服務:Zookeeper、Kafka和Debezium的聯結器服務。 本教程將指導您使用Docker和Debezium的Docker映像啟動這些服務的單個例項。 另一方面,生產環境需要執行每個服務的多個例項,以保證
《Apache Zookeeper 官方文件》-3 快速指南:使用zookeeper來協調分散式應用
原文連結 譯者:softliumin 校對:方騰飛 本節內容讓你快速入門zookeeper。它主要針對想嘗試使用zookeeper的開發者,幷包含一個ZooKeeper單機伺服器的安裝說明,你可以用一些命令來驗證它的執行,以及簡單的程式設計例項。最後,為了考慮到方便性,有一些複雜的安裝部分
[網路開發]RakNet文件翻譯(3)
如何將你的資料編碼到一個數據包中?執行RakNet的系統通過人們所熟知的資料包進行通訊,實際上所有在Internet上執行的系統都如此。更準確的說,在UDP協議下,它用的是資料報。每一個通過RakNet建立的資料報中都包含了一條或者多條資訊。訊息可以是通過你建立的,例如位置資訊,血量資訊,或者其他通過RakN
《Spring 5官方文件》3 IOC容器 3.11-3.16
原文連結 譯者:maxam0128 3. IOC 3.11 使用JSR 330標準註解 從Spring3.0開始,Spring提供了對JSR-330標準註解(依賴注入)的支援。這些註解和Spring的註解以相同的方式進行掃描。你只需要在你的classpath中新增有關的jar包。 如果你使用
《Spring Cloud Netflix官方文件》3.熔斷器:Hystrix Clients
原文連結 Netfilix建立了一個名為Hystrix的庫,實現了熔斷器模式。在微服務架構中,它通常有多個服務呼叫層。 圖3.1 微服務圖 一個底層服務的故障會引發直至使用者互動層的連鎖故障。在一個設定時長為“metrics.rollingStats.timeInMilliseconds”