1. 程式人生 > >Kafka實戰-簡單示例

Kafka實戰-簡單示例

1.概述

  上一篇部落格《Kafka實戰-Kafka Cluster》中,為大家介紹了Kafka叢集的安裝部署,以及對Kafka叢集Producer/Consumer、HA等做了相關測試,今天我們來開發一個Kafka示例,練習如何在Kafka中進行程式設計,下面是今天的分享的目錄結構:

  • 開發環境
  • ConfigureAPI
  • Consumer
  • Producer
  • 截圖預覽

  下面開始今天的內容分享。

2.開發環境

  在開發Kafka相關應用之前,我們得將Kafka得開發環境搭建完成,這裡我所使用得開發環境如下所示:

基礎軟體 工具名稱
IDE JBoss Studio 8
JDK 1.7

  關於基礎軟體的下載及相關配置,大家可參考我寫的《高可用Hadoop平臺-啟航》一文的相關贅述,這裡就不多做介紹了。在安裝好相關基礎軟體後,我們開始專案工程的建立,這裡我們所使用的工程結構是Maven,關於Maven環境的相關配置資訊,可參考我在《Hadoop2原始碼分析-準備篇》一文對Maven環境配置的贅述。

  在準備完成相關基礎軟體以及Maven環境後,我們大家建立的工程,在pom.xml檔案中,新增Kafka的依賴包,新增程式碼如下所示:

        <dependency>
            <groupId
>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> </dependency>

  下面開始編寫今天的程式碼示例。

3.ConfigureAPI

  首先是一個配置結構類檔案,配置Kafka的相關引數,程式碼如下所示:

package cn.hadoop.hdfs.conf;

/**
 * @Date Apr 28, 2015
 *
 * @Author dengjie
 *
 * @Note Set param path
 
*/ public class ConfigureAPI { public interface KafkaProperties { public final static String ZK = "10.211.55.15:2181,10.211.55.17:2181,10.211.55.18:2181"; public final static String GROUP_ID = "test_group1"; public final static String TOPIC = "test2"; public final static String BROKER_LIST = "10.211.55.15:9092,10.211.55.17:9092,10.211.55.18:9092"; public final static int BUFFER_SIZE = 64 * 1024; public final static int TIMEOUT = 20000; public final static int INTERVAL = 10000; } }

4.Consumer

  然後是一個消費程式,用於消費Kafka的訊息,程式碼如下所示:

  • JConsumer

package cn.hadoop.hdfs.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka Consumer
 */
public class JConsumer extends Thread {

    private ConsumerConnector consumer;
    private String topic;
    private final int SLEEP = 1000 * 3;

    public JConsumer(String topic) {
        consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());
        this.topic = topic;
    }

    private ConsumerConfig consumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZK);
        props.put("group.id", KafkaProperties.GROUP_ID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("Receive->[" + new String(it.next().message()) + "]");
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}

5.Producer

  接著是Kafka的生產訊息程式,用於產生Kafka的訊息供Consumer去消費,具體程式碼如下所示:

  • JProducer

package cn.hadoop.hdfs.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note Kafka JProducer
 */
public class JProducer extends Thread {

    private Producer<Integer, String> producer;
    private String topic;
    private Properties props = new Properties();
    private final int SLEEP = 1000 * 3;

    public JProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.211.55.18:9092");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int offsetNo = 1;
        while (true) {
            String msg = new String("Message_" + offsetNo);
            System.out.println("Send->[" + msg + "]");
            producer.send(new KeyedMessage<Integer, String>(topic, msg));
            offsetNo++;
            try {
                sleep(SLEEP);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

}

6.截圖預覽

  在開發完Consumer和Producer的程式碼後,我們來測試相關應用,下面給大家編寫了一個Client去測試Consumer和Producer,具體程式碼如下所示:

  • KafkaClient

package cn.hadoop.hdfs.kafka.client;

import cn.hadoop.hdfs.conf.ConfigureAPI.KafkaProperties;
import cn.hadoop.hdfs.kafka.JConsumer;
import cn.hadoop.hdfs.kafka.JProducer;

/**
 * @Date May 22, 2015
 *
 * @Author dengjie
 *
 * @Note To run Kafka Code
 */
public class KafkaClient {

    public static void main(String[] args) {
        JProducer pro = new JProducer(KafkaProperties.TOPIC);
        pro.start();

        JConsumer con = new JConsumer(KafkaProperties.TOPIC);
        con.start();
    }

}

  執行截圖如下所示:

7.總結

  大家在開發Kafka的應用時,需要注意相關事項。若是使用Maven專案工程,在新增相關Kafka依賴JAR包時,有可能依賴JAR會下載失敗,若出現這種情況,可手動將Kafka的依賴JAR包新增到Maven倉庫即可,在編寫Consumer和Producer程式,這裡只是給出一個示例讓大家先熟悉Kafka的程式碼如何去編寫,後面會給大家更加詳細複雜的程式碼模組案例。

8.結束語

  這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!

相關推薦

Kafka實戰簡單示例

1.概述   上一篇部落格《Kafka實戰-Kafka Cluster》中,為大家介紹了Kafka叢集的安裝部署,以及對Kafka叢集Producer/Consumer、HA等做了相關測試,今天我們來開發一個Kafka示例,練習如何在Kafka中進行程式設計,下面是今天的分享的目錄結構: 開發環境

Go Kafka客戶端簡單示例

一. 準備 安裝依賴庫saramago get github.com/Shopify/sarama該庫要求kafka版本在0.8及以上,支援kafka定義的high-level API和low-le

KafkaOffsetMonitor Kafka實戰KafkaOffsetMonitor

1.概述   前面給大家介紹了Kafka的背景以及一些應用場景,並附帶上演示了Kafka的簡單示例。然後,在開發的過程當中,我們會發現一些問題,那就是訊息的監控情況。雖然,在啟動Kafka的相關服務後,我們生產訊息和消費訊息會在終端控制檯顯示這些記錄資訊,但是,這樣始終

Kafka實戰Kafka Cluster

1.概述   在《Kafka實戰-入門》一篇中,為大家介紹了Kafka的相關背景、原理架構以及一些關鍵知識點,本篇部落格為大家來贅述一下Kafka Cluster的相關內容,下面是今天為大家分享的目錄: 基礎軟體的準備 Kafka Cluster的部署 Send Messages HA特性

Kafka實戰入門

1.概述   經過一個多月的時間觀察,業務上在整合Kafka後,各方面還算穩定,這裡打算抽時間給大家分享一下Kafka在實際場景中的一些使用心得。本篇部落格打算先給大家入個門,讓大家對Kafka有個初步的瞭解,知道Kafka是做什麼的,下面是本篇部落格的目錄內容: Kafka背景 Kafka應用場景

Kafka實戰KafkaOffsetMonitor

1.概述   前面給大家介紹了Kafka的背景以及一些應用場景,並附帶上演示了Kafka的簡單示例。然後,在開發的過程當中,我們會發現一些問題,那就是訊息的監控情況。雖然,在啟動Kafka的相關服務後,我們生產訊息和消費訊息會在終端控制檯顯示這些記錄資訊,但是,這樣始終不夠友好,而且,在實際開發中,我們不

Kafka專案實戰使用者日誌上報實時統計之編碼實踐

1.概述 該課程我以使用者實時上報日誌案例為基礎,帶著大家去完成各個KPI的編碼工作,實現生產模組、消費模組,資料持久化,以及應用排程等工作, 通過對這一系列流程的演示,讓大家能夠去掌握Kafka專案的相關編碼以及排程流程。下面,我們首先來預覽本課程所包含的課時,他們分別

史上最簡單kafka實戰教程

你在寫java 版的 kafka程式可能會遇到如下問題問題一:程式丟擲了org.apache.kafka.common.errors.TimeoutException:在application.yml 中加入下面這句話 :logging.level.root:debug然後再

ASP.NET MVC3實戰系列(一):簡單示例

ASP.NET MVC已經推出時間不短了,已經有很多專案在使用這個優秀的WEB開發框架。因為我們專案每次加人的時候,對MVC都不是特別熟悉,有一些人認為這個非常簡單,導致寫出來的程式完全不是MVC的,所以我就想寫個系列總結一下實戰中的經驗和一些學習的筆記。我們先不談論MVC的好處,等我們寫過一些程式和示例後

Kafka應用實戰——Kafka安裝及簡單使用

Ingredient: 1 Kafka簡介 Kafka是什麼呢?Kafka官網說自己是“A distributed streaming platform”,也就是一個“分散式流媒體平臺”,其實就是一個訊息佇列平臺。訊息之間是一個“public & s

網站首頁布局實戰簡單

成了 meta screen head false active amp 關於 span 跟著教程完成了一個簡單的首頁制作,沒有用js,畢竟是第一個實戰,紀念一下 HTML: <!DOCTYPE html><html lang="en"><he

[shell]system和execlp簡單示例

div print logs $1 script col echo null ram shell腳本:hello.sh #!/bin/bash echo "i am in shell script" echo "param 1 is $1" echo "param 2 i

Asp.Net Core WebAPI入門整理(二)簡單示例

序列 open exc tor pda template ssa net found 一、Core WebAPI中的序列化 使用的是Newtonsoft.Json,自定義全局配置處理: // This method gets called by the runtime.

JAVA入門[20]-Hibernate簡單示例

roo mysql play ger 4.3 path arc result 建數據庫 一、Hibernate簡介 在很多場景下,我們不需要使用JdbcTemplate直接操作SQL語句,這時候可以用ORM工具來節省數大量的的代碼和開發時間。ORM工具能夠把註意力從容易出

死鎖的簡單示例

clas rgs system stack 更改 示例 鎖定 相等 mage 什麽是死鎖? 死鎖是指兩個或兩個以上的進程在執行過程中,由於競爭資源或者由於彼此通信而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。 產生死鎖的四個必要條件: (1) 互斥條件:一個資

vue-router單頁應用簡單示例(一)

問題 clas 做了 設置 new scope 文件的 log target 請先完成了項目初始化,具體請看我另一篇博文。vue項目初始化 看一下完成的效果圖,很典型的單頁應用。 .vue後綴名的單文件組件 這裏先說一下我對組件的理解。組件,顧名思義就是一組元素組成的

vue-router單頁應用簡單示例(二)

數據 prop tps div -1 可重用性 example 定位 .com 我們先來理一下思路。 圖1:main.js 引入vue,App.vue,router/index.js文件 聲明要渲染的Id為app,將App.vue中的模版渲染到入口界面(就

[pthread]Linux C 多線程簡單示例

簡單 _exit bsp clas flags thread read arm color #include <stdio.h> #include <pthread.h> pthread_mutex_t mutex; pthread_con

Echart 使用圖表簡單示例

har echarts 網格 ntb rip common 圖表 技術分享 nes 簡單應用方式: <div id="main"></div> 引用Echart <script src="js/echarts.common.min.js"&

簡單示例用例(Simple Example Use Cases)--hive GettingStarted用例翻譯

翻譯 nload insert fields 清洗 group eas lease wid 1、MovieLens User Ratings First, create a table with tab-delimited text file format: 首先,創建