Kafka 2.3 Producer (0.9以後版本適用)
kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。
這裡直接使用最新2.3版本,0.9以後的版本都適用。
注意引用的包為:org.apache.kafka.clients.producer
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); kafkaProducer.send(new ProducerRecord<>("topic", "value")); kafkaProducer.close(); } }
0.11.0以後增加了事務,事務producer的示例程式碼如下,需要適用於0.11.0以後的版本:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class TransactionsProducerDemo { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); } }
更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算
相關推薦
Kafka 2.3 Producer (0.9以後版本適用)
kafka0.9版本以後用java重新編寫了producer,廢除了原來scala編寫的版本。 這裡直接使用最新2.3版本,0.9
Kafka 0.8 Producer (0.9以前版本適用)
Kafka舊版本producer由scala編寫,0.9以後已經廢除 示例程式碼如下: import kafka.producer
凝思4.2 安裝thrift 0.9.3
bison version >= 2.5 bison 版本需大於等於2.5 刪除當前版本bison rm /usr/bin/bison rm /usr/bin/yacc rm /usr/lib/liby.a 安裝bison 3.2.1 https://ft
Kafka 0.9 新版本consumer客戶端使用介紹
kafka最初時開發時, 所帶的producer和consumer client都是Scala所寫. 我們逐漸發現這些API具有一些限制. high-level的api支援consumer groups和故障轉移, 但是不支援許多複雜的使用場景, 同時還有一
面試題3:在一個長度為n的數組裏的所有數字都在0到n-1的範圍內。 數組中某些數字是重復的,但不知道有幾個數字是重復的。也不知道每個數字重復幾次。請找出數組中任意一個重復的數字。 例如,如果輸入長度為7的數組{2,3,1,0,2,5,3},那麽對應的輸出是第一個重復的數字2。
length value 如果 while 返回 sys public ret || package siweifasan_6_5; /** * @Description:在一個長度為n的數組裏的所有數字都在0到n-1的範圍內。 * 數組中某些數字是重復的,
js中sum(2)(3)(4)返回9和sum(2,3)和sum(2)(3)都返回5並要求擴展性
lang itl ron var 就會 bsp tle 關於 網上 網上有很多關於sum(1)(2)(3),sum(1,2,3)之類的面試題要求輸出相同的結果6並要求可以滿足擴展,即有多個參數時也能符合題設的要求,所以自己寫了部分例子可以大概滿足這些面試題的要求 <!
ThinkPHP 3.2.3~5.0.10 快取函式設計缺陷後臺GetShell實戰
0×00 前言 ThinkPHP是為了簡化企業級應用開發和敏捷WEB應用開發而誕生的,由於其簡單易用,很多cms都基於該框架改寫。然而 Thinkphp在快取使用卻存在缺陷,生成快取時,Thinkphp會將資料序列化存進一個php檔案,這就產生了很大的安全問題。 0×01 環境搭建 工具
webstorm 2018.2.3(64位)版本啟用碼-----有效期至2019.7.31
AWAC5NN6E4-eyJsaWNlbnNlSWQiOiJBV0FDNU5ONkU0IiwibGljZW5zZWVOYW1lIjoibGIgb2QiLCJhc3NpZ25lZU5hbWUiOiIiLCJhc3NpZ25lZUVtYWlsIjoiIiwibGlj
PEACHPIE 0.9.11 版本釋出,可以上生產了
0.9.11是第一個非預覽版本,也就是說可以用於生產了,編譯本身快速且使用者友好(更好的錯誤訊息),有一個重大改進的文件(https://docs.peachpie.io/)和新的.NET Core 的 PeachPie Project ,可以和 Visual Studio(> = 2017更新6)
MySQL資料庫驅動mysql-connector-java的8.0.9-rc版本連線MySQL資料庫
之前我的MySQL資料庫驅動mysql-connector-java版本號為5.1.34,在升級成8.0.9-rc版本後,發現原來的連線方式報錯了。故在這裡記錄一下新版本的MySQL資料庫驅動的連線使用方式。 先貼出來以前舊版本(5.1.34)的連線方
hbase1.2.3+zookeeper3.4.9+hadoop2.7.3完全分散式部署遇到的問題
啟動start-hbase.sh 後hbase沒有啟動 檢視日誌如下: ERROR [main] master.HMasterCommandLine: Master exiting java.io.IOException: Could not start ZK with 3
記一發Hive on tez的配置(Hive 3.1.1, Hadoop 3.0.3, Tez 0.9.1)
sta 麻煩 參考 手動 需要 version test log all 直接下載Tez的binary包部署安裝是有問題的,因為默認支持hadoop版本為2.7,2.7以上的就需要手動編譯了。 下載Tez源碼 CD到源碼文件夾,mvn install -Dhadoop.v
94、tensorflow實現語音識別0,1,2,3,4,5,6,7,8,9
結果 test amp building pre cti fun ner edi ‘‘‘ Created on 2017年7月23日 @author: weizhen ‘‘‘ #導入庫 from __future__ import division,print_func
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <
java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10)
前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。 1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafk
QTrace 0.2.3 版本釋出
下載地址: http://www.pc6.com/softview/SoftView_614309.html 主要修改: 1.對話方塊顯示優化 2.本地GNU搜尋優化 3.遠端搜尋,本地搜尋採用列表顯示,結果可讀性更好; 4.遠端搜尋介面優化,當搜尋錯誤時提
Redis Desktop Manager 0.9.3 版本下載(官方最新版需要訂閱,好像要給錢才行)
下載地址:https://pan.baidu.com/s/1P856NPusJLUSFwQjjPdltA 密碼: 12d3 版本是兩三個月前,我從官網下載的,然後順便存到了我的行動硬碟上。0.9.3.817.exe github 上有 redis destop
python相關軟體安裝流程圖解——Windows下安裝Redis以及視覺化工具——Redis-x64-3.2.100——redis-desktop-manager-0.9.3.817
https://www.2cto.com/database/201708/666191.html https://github.com/MicrosoftArchive/redis/releases
ACMNO.21 C語言-逆序輸出 輸入10個數字,然後逆序輸出。 輸入 十個整數 輸出 逆序輸出,空格分開 樣例輸入 1 2 3 4 5 6 7 8 9 0
題目描述 輸入10個數字,然後逆序輸出。 輸入 十個整數 輸出 逆序輸出,空格分開 樣例輸入 1 2 3 4 5 6 7 8 9 0 樣例輸出 0 9 8 7 6 5 4 3 2 1 提示 陣列?堆疊? 來源/分類 C語言
Kafka 0.11新版本釋出:主要的功能變更介紹:支援 EOS, 事務和冪等producer
Apache Kafka近日推出0.11版本。這是一個里程碑式的大版本,特別是Kafka從這個版本開始支援“exactly-once”語義(下稱EOS, exactly-once semantics