1. 程式人生 > >Apache Kafka系列(三) Java API使用

Apache Kafka系列(三) Java API使用

摘要:

  Apache Kafka Java Client API

一、基本概念

  Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如:

    1.建立Topic

    2.羅列出已存在的Topic

    3.對已有Topic的Produce/Consume測試

  跟其他的訊息系統一樣,Kafka提供了多種不用語言實現的客戶端API,如:Java,Python,Ruby,Go等。這些API極大的方便使用者使用Kafka叢集,本文將展示這些API的使用

二、前提

  • 在本地虛擬機器中安裝了Kafka 0.11.0版本,可以參照前一篇文章:  
    Apache Kafka系列(一) 起步
  • 本地安裝有JDK1.8
  • IDEA編譯器
  • Maven3

三、專案結構

  Maven pom.xml如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion
> <groupId>com.randy</groupId> <artifactId>kafka_api_demo</artifactId> <version>1.0-SNAPSHOT</version> <name>Maven</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source
>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies> </project>

四、原始碼

  4.1 Producer的原始碼    

package com.randy;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
 * Author  : RandySun
 * Date    : 2017-08-13  16:23
 * Comment :
 */
public class ProducerDemo {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.110:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 100; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            producer.close();
        }

    }
}

  可以使用KafkaProducer類的例項來建立一個Producer,KafkaProducer類的引數是一系列屬性值,下面分析一下所使用到的重要的屬性:

  • bootstrap.servers
properties.put("bootstrap.servers", "192.168.1.110:9092");

   bootstrap.servers是Kafka叢集的IP地址,如果Broker數量超過1個,則使用逗號分隔,如"192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110是我的其中一臺虛擬機器的

           IP地址,9092是所監聽的埠

  • key.serializer   &  value.serializer
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     序列化型別。 Kafka訊息是以鍵值對的形式傳送到Kafka叢集的,其中Key是可選的,Value可以是任意型別。但是在Message被髮送到Kafka叢集之前,Producer需要把不同型別的消

   息序列化為二進位制型別。本例是傳送文字訊息到Kafka叢集,所以使用的是StringSerializer。

  • 傳送Message到Kafka叢集
   for (int i = 0; i < 100; i++) {
      String msg = "Message " + i;
      producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
      System.out.println("Sent:" + msg);
   }

   上述程式碼會發送100個訊息到HelloWorld這個Topic

  4.2 Consumer的原始碼

package com.randy;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * Author  : RandySun
 * Date    : 2017-08-13  17:06
 * Comment :
 */
public class ConsumerDemo {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.110:9092");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

    }
}

  可以使用KafkaConsumer類的例項來建立一個Consumer,KafkaConsumer類的引數是一系列屬性值,下面分析一下所使用到的重要的屬性:

  • bootstrap.servers

    和Producer一樣,是指向Kafka叢集的IP地址,以逗號分隔。

  • group.id

     Consumer分組ID

  • key.deserializer and value.deserializer

 發序列化。Consumer把來自Kafka叢集的二進位制訊息反序列化為指定的型別。因本例中的Producer使用的是String型別,所以呼叫StringDeserializer來反序列化

  Consumer訂閱了Topic為HelloWorld的訊息,Consumer呼叫poll方法來輪循Kafka叢集的訊息,其中的引數100是超時時間(Consumer等待直到Kafka叢集中沒有訊息為止): 

        kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }

五、總結

  本文展示瞭如何建立一個Producer並生成String型別的訊息,Consumer消費這些訊息。這些都是基於Apache Kafka 0.11.0 Java API。

相關推薦

Apache Kafka系列() Java API使用

摘要:   Apache Kafka Java Client API 一、基本概念   Kafka集成了Producer/Consumer連線Broker的客戶端工具,但是在訊息處理方面,這兩者主要用於服務端(Broker)的簡單操作,如:     1.建立Topic     2.羅列出已存在的Topic

jvm系列():java GC算法 垃圾收集器

應對 sca 互聯 都是 生命 改進 壓縮 速度 垃圾收集器 原文鏈接:http://www.cnblogs.com/ityouknow/p/5614961.html 概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的

kafka系列Kafka款監控工具比較

轉載原文:http://top.jobbole.com/31084/ 通過研究,發現主流的三種kafka監控程式分別為: Kafka Web Conslole  Kafka Manager KafkaOffsetMonitor 現在依次介紹以上三種工具: 一、Kafka W

kafka 和storm Java api程式設計中 pom檔案範例

要注意的是執行的時候可能會遇到日誌檔案jar包重複的情況,這裡要用到<exclusions>排除如下 <exclusion> <groupId>org.slf4j</groupId>

apache kafka系列之效能測試報告(虛擬機器版)

測試方法 在其他虛擬機器上使用 Kafka 自帶 kafka-producer-perf-test.sh 指令碼進行測試 Kafka 寫入效能 嘗試使用 kafka-simple-consumer-p

Apache Kafka系列(二) 命令列工具(CLI)

Apache Kafka命令列工具(Command Line Interface,CLI),下文簡稱CLI。 1. 啟動Kafka   啟動Kafka需要兩步:   1.1. 啟動ZooKeeper  [[email protected] kafka_2.12-0.11.0.0]# bin/zo

Apache Kafka系列(一) 起步

摘要:   1.Apache Kafka基本概念   2.Kafka的安裝   3.基本工具建立Topic  本文基於centos7, Apache Kafka 0.11.0 一、基本概念   Apache Kafka是一個釋出/訂閱的訊息系統,於2009年源自Linkedin,並與2011年開源。在架構方

Apache Kafka系列(五) Kafka Connect及FileConnector示例

一. Kafka Connect簡介   Kafka是一個使用越來越廣的訊息系統,尤其是在大資料開發中(實時資料處理和分析)。為何整合其他系統和解耦應用,經常使用Producer來發送訊息到Broker,並使用Consumer來消費Broker中的訊息。Kafka Connect是到0.9版本才提供的並極大

Apache Kafka系列(四) 多執行緒Consumer方案

本文的圖片是通過PPT截圖出的,讀者如果修改意見請聯絡我 一、Consumer為何需要實現多執行緒   假設我們正在開發一個訊息通知模組,該模組允許使用者訂閱其他使用者傳送的通知/訊息。該訊息通知模組採用Apache Kafka,那麼整個架構應該是訊息的釋出者通過Producer呼叫API寫入訊息到Kafk

jvm系列():java GC演算法 垃圾收集器

GC演算法 垃圾收集器 概述 垃圾收集 Garbage Collection 通常被稱為“GC”,它誕生於1960年 MIT 的 Lisp 語言,經過半個多世紀,目前已經十分成熟了。 jvm 中,程式計數器、虛擬機器棧、本地方法棧都是隨執行緒而生隨執行緒而滅,棧幀隨著方法的進入和退出做入棧和出棧

Kafka 生產者消費者 Java API 程式設計

我們先建立一個topic,然後啟動生產者和消費者,進行訊息通訊,然後在使用Kafka API程式設計的方式實現,筆者使用的ZK和Kafka都是單節點,你也可以使用叢集方式。 啟動Zookeeper zkServer.sh start 啟動Kafka ka

apache kafka系列kafka.common.ConsumerRebalanceFailedException異常解決辦法

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf

apache kafka系列之原始碼分析走讀-kafka內部模組分析

apache kafka中國社群QQ群:162272557 kafka整體結構分析: kafka原始碼工程目錄結構如下圖: 下面只對core目錄結構作說明,其他都是測試類或java客戶端程式碼 admin   --管理員模組,操作和管理topic,parit

spark2.x由淺入深深到底系列六之RDD java api詳解

老湯 spark 大數據 javaapi rdd 學習任何spark知識點之前請先正確理解spark,可以參考:正確理解spark本文詳細介紹了spark key-value類型的rdd java api一、key-value類型的RDD的創建方式1、sparkContext.parall

大數據學習系列 ----- HBase Java Api 圖文詳解

工具 itl 進行 圖片 置配 動態數據 sync ase tac 引言 在上一篇中大數據學習系列之二 ----- HBase環境搭建(單機) 中,成功搭建了Hadoop+HBase的環境,本文則主要講述使用Java 對HBase的一些操作。 一、事前準備 1.確認hado

kafka系列五、kafka常用java API

引入maven包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <

Apache Solr系列】Solr客戶端SolrJ API使用文件-增刪改

通過之前兩篇文章的學習之後,使用solr對mysql進行資料匯入以及增量索引應該都會了! 接下來我們學習下如果從Solr中讀取我們想要的資料。同時你也可以結合Solr的web介面進行驗證,看看你的查詢結果是否正確。 環境準備: 從之前下載的solr安裝包中解壓獲取以下ja

ElasticSearch實戰系列: ElasticSearch的JAVA API使用教程

前言 在上一篇中介紹了ElasticSearch實戰系列二: ElasticSearch的DSL語句使用教程---圖文詳解,本篇文章就來講解下 ElasticSearch 6.x官方Java API的使用。 ElasticSearch JAVA API 目前市面上有幾種常見的ElasticSearch Jav

深入理解JAVA集合系列:HashMap的死循環解讀

現在 最新 star and 場景 所有 image cap 時也 由於在公司項目中偶爾會遇到HashMap死循環造成CPU100%,重啟後問題消失,隔一段時間又會反復出現。今天在這裏來仔細剖析下多線程情況下HashMap所帶來的問題: 1、多線程put操作後,get操作導

深入理解JAVA I/O系列:字符流詳解

buffer 情況 二進制文件 感到 復制代碼 使用範圍 轉換 fileread 方式 字符流為何存在 既然字節流提供了能夠處理任何類型的輸入/輸出操作的功能,那為什麽還要存在字符流呢?容我慢慢道來,字節流不能直接操作Unicode字符,因為一個字符有兩個字節,字節流一次只