Apache Kafka系列(三) Java API使用
摘要:
Apache Kafka Java Client API
一、基本概念
Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如:
1.建立Topic
2.羅列出已存在的Topic
3.對已有Topic的Produce/Consume測試
跟其他的訊息系統一樣,Kafka提供了多種不用語言實現的客戶端API,如:Java,Python,Ruby,Go等。這些API極大的方便使用者使用Kafka叢集,本文將展示這些API的使用
二、前提
- 在本地虛擬機器中安裝了Kafka 0.11.0版本,可以參照前一篇文章:
- 本地安裝有JDK1.8
- IDEA編譯器
- Maven3
三、專案結構
Maven pom.xml如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.randy</groupId> <artifactId>kafka_api_demo</artifactId> <version>1.0-SNAPSHOT</version> <name>Maven</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies> </project>
四、原始碼
4.1 Producer的原始碼
package com.randy; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * Author : RandySun * Date : 2017-08-13 16:23 * Comment : */ public class ProducerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.110:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
可以使用KafkaProducer類的例項來建立一個Producer,KafkaProducer類的引數是一系列屬性值,下面分析一下所使用到的重要的屬性:
- bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");
bootstrap.servers是Kafka叢集的IP地址,如果Broker數量超過1個,則使用逗號分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一臺虛擬機器的
IP地址,9092是所監聽的埠
- key.serializer & value.serializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
序列化型別。 Kafka訊息是以鍵值對的形式傳送到Kafka叢集的,其中Key是可選的,Value可以是任意型別。但是在Message被髮送到Kafka叢集之前,Producer需要把不同型別的消
息序列化為二進位制型別。本例是傳送文字訊息到Kafka叢集,所以使用的是StringSerializer。
- 傳送Message到Kafka叢集
for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); }
上述程式碼會發送100個訊息到HelloWorld這個Topic
4.2 Consumer的原始碼
package com.randy; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; /** * Author : RandySun * Date : 2017-08-13 17:06 * Comment : */ public class ConsumerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.110:9092"); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
可以使用KafkaConsumer類的例項來建立一個Consumer,KafkaConsumer類的引數是一系列屬性值,下面分析一下所使用到的重要的屬性:
- bootstrap.servers
和Producer一樣,是指向Kafka叢集的IP地址,以逗號分隔。
- group.id
Consumer分組ID
- key.deserializer and value.deserializer
發序列化。Consumer把來自Kafka叢集的二進位制訊息反序列化為指定的型別。因本例中的Producer使用的是String型別,所以呼叫StringDeserializer來反序列化
Consumer訂閱了Topic為HelloWorld的訊息,Consumer呼叫poll方法來輪循Kafka叢集的訊息,其中的引數100是超時時間(Consumer等待直到Kafka叢集中沒有訊息為止):
kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } }
五、總結
本文展示瞭如何建立一個Producer並生成String型別的訊息,Consumer消費這些訊息。這些都是基於Apache Kafka 0.11.0 Java API。
相關推薦
Apache Kafka系列(三) Java API使用
摘要: Apache Kafka Java Client API 一、基本概念 Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如: 1.建立Topic 2.羅列出已存在的Topic
jvm系列(三):java GC算法 垃圾收集器
應對 sca 互聯 都是 生命 改進 壓縮 速度 垃圾收集器 原文鏈接:http://www.cnblogs.com/ityouknow/p/5614961.html 概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的
kafka系列三、Kafka三款監控工具比較
轉載原文:http://top.jobbole.com/31084/ 通過研究,發現主流的三種kafka監控程式分別為: Kafka Web Conslole Kafka Manager KafkaOffsetMonitor 現在依次介紹以上三種工具: 一、Kafka W
kafka 和storm Java api程式設計中 pom檔案範例
要注意的是執行的時候可能會遇到日誌檔案jar包重複的情況,這裡要用到<exclusions>排除如下 <exclusion> <groupId>org.slf4j</groupId>
apache kafka系列之效能測試報告(虛擬機器版)
測試方法 在其他虛擬機器上使用 Kafka 自帶 kafka-producer-perf-test.sh 指令碼進行測試 Kafka 寫入效能 嘗試使用 kafka-simple-consumer-p
Apache Kafka系列(二) 命令列工具(CLI)
Apache Kafka命令列工具(Command Line Interface,CLI),下文簡稱CLI。 1. 啟動Kafka 啟動Kafka需要兩步: 1.1. 啟動ZooKeeper [[email protected] kafka_2.12-0.11.0.0]# bin/zo
Apache Kafka系列(一) 起步
摘要: 1.Apache Kafka基本概念 2.Kafka的安裝 3.基本工具建立Topic 本文基於centos7, Apache Kafka 0.11.0 一、基本概念 Apache Kafka是一個釋出/訂閱的訊息系統,於2009年源自Linkedin,並與2011年開源。在架構方
Apache Kafka系列(五) Kafka Connect及FileConnector示例
一. Kafka Connect簡介 Kafka是一個使用越來越廣的訊息系統,尤其是在大資料開發中(實時資料處理和分析)。為何整合其他系統和解耦應用,經常使用Producer來發送訊息到Broker,並使用Consumer來消費Broker中的訊息。Kafka Connect是到0.9版本才提供的並極大
Apache Kafka系列(四) 多執行緒Consumer方案
本文的圖片是通過PPT截圖出的,讀者如果修改意見請聯絡我 一、Consumer為何需要實現多執行緒 假設我們正在開發一個訊息通知模組,該模組允許使用者訂閱其他使用者傳送的通知/訊息。該訊息通知模組採用Apache Kafka,那麼整個架構應該是訊息的釋出者通過Producer呼叫API寫入訊息到Kafk
jvm系列(三):java GC演算法 垃圾收集器
GC演算法 垃圾收集器 概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的 Lisp 語言,經過半個多世紀,目前已經十分成熟了。 jvm 中,程式計數器、虛擬機器棧、本地方法棧都是隨執行緒而生隨執行緒而滅,棧幀隨著方法的進入和退出做入棧和出棧
Kafka 生產者消費者 Java API 程式設計
我們先建立一個topic,然後啟動生產者和消費者,進行訊息通訊,然後在使用Kafka API程式設計的方式實現,筆者使用的ZK和Kafka都是單節點,你也可以使用叢集方式。 啟動Zookeeper zkServer.sh start 啟動Kafka ka
apache kafka系列之kafka.common.ConsumerRebalanceFailedException異常解決辦法
kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf
apache kafka系列之原始碼分析走讀-kafka內部模組分析
apache kafka中國社群QQ群:162272557 kafka整體結構分析: kafka原始碼工程目錄結構如下圖: 下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼 admin --管理員模組,操作和管理topic,parit
spark2.x由淺入深深到底系列六之RDD java api詳解三
老湯 spark 大數據 javaapi rdd 學習任何spark知識點之前請先正確理解spark,可以參考:正確理解spark本文詳細介紹了spark key-value類型的rdd java api一、key-value類型的RDD的創建方式1、sparkContext.parall
大數據學習系列之三 ----- HBase Java Api 圖文詳解
工具 itl 進行 圖片 置配 動態數據 sync ase tac 引言 在上一篇中大數據學習系列之二 ----- HBase環境搭建(單機) 中,成功搭建了Hadoop+HBase的環境,本文則主要講述使用Java 對HBase的一些操作。 一、事前準備 1.確認hado
kafka系列五、kafka常用java API
引入maven包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <
【Apache Solr系列之三】Solr客戶端SolrJ API使用文件-增刪改
通過之前兩篇文章的學習之後,使用solr對mysql進行資料匯入以及增量索引應該都會了! 接下來我們學習下如果從Solr中讀取我們想要的資料。同時你也可以結合Solr的web介面進行驗證,看看你的查詢結果是否正確。 環境準備: 從之前下載的solr安裝包中解壓獲取以下ja
ElasticSearch實戰系列三: ElasticSearch的JAVA API使用教程
前言 在上一篇中介紹了ElasticSearch實戰系列二: ElasticSearch的DSL語句使用教程---圖文詳解,本篇文章就來講解下 ElasticSearch 6.x官方Java API的使用。 ElasticSearch JAVA API 目前市面上有幾種常見的ElasticSearch Jav
深入理解JAVA集合系列三:HashMap的死循環解讀
現在 最新 star and 場景 所有 image cap 時也 由於在公司項目中偶爾會遇到HashMap死循環造成CPU100%,重啟後問題消失,隔一段時間又會反復出現。今天在這裏來仔細剖析下多線程情況下HashMap所帶來的問題: 1、多線程put操作後,get操作導
深入理解JAVA I/O系列三:字符流詳解
buffer 情況 二進制文件 感到 復制代碼 使用範圍 轉換 fileread 方式 字符流為何存在 既然字節流提供了能夠處理任何類型的輸入/輸出操作的功能,那為什麽還要存在字符流呢?容我慢慢道來,字節流不能直接操作Unicode字符,因為一個字符有兩個字節,字節流一次只