1. 程式人生 > >kafka環境搭建+spring-kafka-demo測試

kafka環境搭建+spring-kafka-demo測試

kafka環境搭建1-單機

kafka是基於scala語言開發,所以需要java執行環境,下載前請先確認是否已經安裝並配置java環境

安裝

  • 壓縮

tar -xzf kafka_2.11-0.9.0.0.tgz

配置訪問地址:

config/server.properties

  • #內網地址:192.168.85.171
  • listeners=PLAINTEXT://192.168.85.171:9092
  • #外網地址:192.168.85.171
  • advertised.listeners=PLAINTEXT://192.168.85.171:9092

開放默

/sbin/iptables 
-I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 2181 -j ACCEPT

首先要進入kafka目錄

  • zookeeper serverkafkazookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

注意 & 號的使用:命令結尾新增 & 號,可以在執行完命令後退出當前命令環境卻不會結束程序

  • kafka server

bin/kafka-server-start.sh config/server.properties &

建立主題

kafka生產和消費資料,必須基於主題topic。主題其實就是對訊息的分類。

  • 建立主題:名稱為“test”、複製數目1partitions1topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

replication-factor  複製數目,提供failover機制;1代表只在一個broker上有資料記錄,一般值都大於1,代表一份資料會自動同步到其他的多個broker,防止某個broker宕機後資料丟失。


partitions  一個topic可以被切分成多個partitions,一個消費者可以消費多個partitions,但一個partitions只能被一個消費者消費,所以增加partitions可以增加消費者的吞吐量。kafka只保證一個partitions內的訊息是有序的,多個一個partitions之間的資料是無序的。

  • 檢視經創建的主

bin/kafka-topics.sh --list --zookeeper localhost:2181

者和消

生產者產生(輸入)資料,消費者消費(輸出)資料

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

啟動後,在命令列下每輸入一些字串按下回車時,就作為一個訊息併發送的kafka

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

啟動消費者時,建議另開一個ssh視窗,方便一遍通過生產者命令列輸入訊息,一遍觀察消費者消費的資料

當在生產者下輸入訊息並回車後,在消費者視窗下就能立即看到對應的訊息,這就說明環境搭建成功。

Springboot +kafka

pom

<dependency>

   <groupId>org.springframework.kafka</groupId>

   <artifactId>spring-kafka</artifactId>

   <version>1.2.2.RELEASE</version>

</dependency>

application.properties

# kafka

spring.kafka.bootstrap-servers=192.168.85.171:9092

spring.kafka.consumer.group-id=group1

spring.kafka.consumer.auto-offset-reset=earliest

controller

package com.kaicom.nettytest.kafka;



import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import org.springframework.web.bind.annotation.RestController;



/**

 * @Author: BillYu

 * @Description:kafka連線測試
* @Date: Created in 下午3:52 2018/6/4.

 */

@RestController

public class SampleController {

    private final Logger log = LoggerFactory.getLogger(SampleController.class);







    @Autowired

    private KafkaTemplate<String, String> template;



    @RequestMapping("/send")

    @ResponseBody

    String send(String topic, String key, String data) {

        template.send(topic, key, data);

        return "success";

    }







    @KafkaListener(id = "test", topics = "test")

    public void listenTest(ConsumerRecord<?, ?> cr) throws Exception {

        System.out.println("===>listen");

        log.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());

    }









}

視覺化監控安裝

 一、KafkaOffsetMonitor簡述

KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目瞭然。

二、KafkaOffsetMonitor下載

KafkaOffsetMonitor託管在Github上,可以通過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases

或者下載百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu

github提供下載的jar包應用了google網站提供的js檔案,由於被牆的原因導致web介面無法訪問。可以直接使用別人修改後的jar包,也可以從github上拉取原始碼然後修改打包。

https://images2017.cnblogs.com/blog/1209537/201801/1209537-20180108132411629-753572802.png

三、KafkaOffsetMonitor啟動

將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來執行這個jar包:

[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088  --refresh 5.seconds --retain 1.days
按回車後,可以看到控制檯輸出:
serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started [email protected]0.0.0.0:8088

如果沒有指定埠,則預設會開啟一個隨機埠。

引數說明:
zk zookeeper主機地址,如果有多個,用逗號隔開
port :應用程式埠
refresh :應用程式在資料庫中重新整理和儲存點的頻率
retain :在db中保留多長時間
dbName :儲存的資料庫檔名,預設為offsetapp

為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動指令碼來直接執行,我這裡新建一個名為:kafka-monitor-start.sh的指令碼,然後編輯這個指令碼:

[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh 
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m  -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;

然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的許可權

[[email protected] KafkaMonitor]# chmod +x kafka-monitor-start.sh 

啟動KafkaOffsetMonitor:

[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
[1] 6551
[[email protected] KafkaMonitor]# lsof -i:8088
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)

四、KafkaOffsetMonitor Web UI

在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:

相關推薦

kafka環境搭建+spring-kafka-demo測試

kafka環境搭建1-單機 kafka是基於scala語言開發,所以需要java執行環境,下載前請先確認是否已經安裝並配置java環境 下載安裝 解壓縮 tar -xzf kafka_2.11-0

mac下kafka環境搭建 測試

kafka介紹:https://blog.csdn.net/see_you_see_me/article/details/78468108 1、安裝工具brew install kafka 會自動安裝依賴zookeeper 2、安裝配置檔案位置 /usr/local/etc/kafka|

windows10下Kafka環境搭建

內容小白,包含JDK+Zookeeper+Kafka三部分。 JDK: 1)   安裝包:Java SE Development Kit 9.0.1       下載地址:http://www.oracle.c

kafka環境搭建

  一、使用技術版本 kafka_2.10-0.10.2.1.tar  zookeeper-3.4.5.tar 二、環境搭建 因為kafka環境依賴於zookeeper,所以先搭建zookeeper 1、zookeeper搭建 先建立一個資料夾

Kafka學習筆記:Kafka環境搭建

Kafka環境搭建 Kafka單機環境搭建 安裝必需 jdk,這裡使用的是jdk1.8 scala,需要獨立安裝scala,這裡使用的是scala 2.11.8 zookeeper,Kafka會自帶zk,但是最好使用獨立的 安裝步驟 1.將Kafka的tar包上傳

Kafkakafka環境搭建及使用

Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計 Kafka將訊息以topic為單位進行歸納。將向Kafka topic釋出訊息的程式成為producers.將預訂topics並消費訊息的程式成為consumer.K

大資料環境搭建Kafka完全分散式環境搭建步驟詳解

文章目錄 環境準備 解壓安裝 配置檔案 服務啟動 1、啟動分散式叢集的zookeeper 2、啟動Kafka服務 偽分散式搭建完畢之後,只要稍作修改就

大資料環境搭建Kafka偽分散式環境搭建步驟詳解

文章目錄 Kafka簡介 環境準備 解壓安裝 配置檔案 服務啟動 Topic相關操作 控制檯生產者 控制檯消費者 Kafka簡介

Kafka環境搭建(分散式)

  KafKa環境搭建(分散式)   1.上傳kafka_2.11-0.10.0.0.tgz到software下面   2.解壓kafka_2.11-0.10.0.0.tgz(並將kafka安裝包放到別的資料夾中,統一管理。可以不理) &nb

Kafka環境搭建(單機)

  KafKa環境搭建(單機)   1.上傳kafka_2.11-0.10.0.0.tgz到software下面 2.解壓kafka_2.11-0.10.0.0.tgz(並將kafka安裝包放到別的資料夾中,統一管理。可以不理)   &

kafka環境搭建(Windows/Linux)

(一)安裝zookeeper(windows) kafka需要用到zookeeper,所以需要先安裝zookeeper 1.到官網下載最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/ 2.解壓到你喜歡的

RN 環境搭建 運行demo App

jdk gis 運行時 .org org 正常 and 創建項目 install 1.環境搭建 1.1 JDK 1.2Android JDK 1.3Node npm config set registry h

Python3與OpenCV3.3 圖像處理(一)--環境搭建與簡單DEMO

http opencv3 opencv col lan pytho href tar .net https://blog.csdn.net/qq_32811489/article/details/78636049 https://blog.csdn.net/gangzhu

eclipse Spring環境搭建 spring tool suite

1、期初用intellij社群版,發現收費版才能開發Java EE。 2、使用eclipse按照網上的教程,在help->eclipse marketplace中搜索sts安裝spring工具套件,由於網速太慢(也有可能需FQ),沒成功。 3、下載sts後( http://spring.io/too

Ubuntu 14.04 Caffe和TensorFlow的ARM NN SDK編譯環境搭建及MNIST程式測試

Ubuntu 14.04下Caffe和TensorFlow的ARM NN SDK的aarch64編譯環境搭建及MNIST程式測試 ARM官方測試環境 1. SCons安裝 2.安裝CMake 3.下載安裝boost 4.使用 S

linux下gsoap環境搭建及C++ demo

第一次在Linux下裸寫程式碼,沒有大神可以抱大腿,makefile寫到吐血。 做一個從webservice服務端獲取資料的小東西。以下記錄環境搭建和基礎demo,資料整理彙總,備忘。 基礎概念 WebService:一種跨程式語言和跨作業系統平臺的遠端呼叫技術。

基礎環境搭建好後的測試

如果前一章基礎環境搭建成功,那麼恭喜成功入坑!但彆著急高興因為還有最後一個守門怪等著你去攻克!! 環境搭建好後的測試 一、官方grep案例 二、官方wordcount案例 一、官方grep案例 在hadoop-2.7.2檔案下面建立一

App自動化測試探索(二)MAC環境搭建iOS+Python+Appium測試環境

code -s image ios 使用 usr developer contents gis 環境搭建要求,MAC 機器一臺,要求 Xcode 8.0以上 1. 安裝 Homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw

SFTP伺服器端-freeSSHd環境搭建及java連線測試

  因工作中需要使用SFTP上傳下載的功能,所以打算自己搭建一個SFTP伺服器用來測試(幾年前搭建過一次,忘了,現在回顧一下)。 下載下來之後直接選擇目錄安裝即可。 安裝完成開啟設定: 設定完成後啟動server 這樣我們的SFTP伺服器就