kafka環境搭建+spring-kafka-demo測試
kafka環境搭建1-單機
kafka是基於scala語言開發,所以需要java執行環境,下載前請先確認是否已經安裝並配置java環境
下載安裝
- 解壓縮
tar -xzf kafka_2.11-0.9.0.0.tgz
配置訪問地址:
config/server.properties
- #內網地址:192.168.85.171
- listeners=PLAINTEXT://192.168.85.171:9092
- #外網地址:192.168.85.171
- advertised.listeners=PLAINTEXT://192.168.85.171:9092
開放默認埠:
/sbin/iptables
-I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 2181 -j ACCEPT
啟動
首先要進入kafka目錄
- 啟動zookeeper server(kafka自帶的zookeeper)
bin/zookeeper-server-start.sh config/zookeeper.properties &
注意 & 號的使用:命令結尾新增 & 號,可以在執行完命令後退出當前命令環境卻不會結束程序
- 啟動kafka server
bin/kafka-server-start.sh config/server.properties &
建立主題
kafka生產和消費資料,必須基於主題topic。主題其實就是對訊息的分類。
- 建立主題:名稱為“test”、複製數目為1、partitions為1的topic主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
replication-factor : 複製數目,提供failover機制;1代表只在一個broker上有資料記錄,一般值都大於1,代表一份資料會自動同步到其他的多個broker,防止某個broker宕機後資料丟失。
partitions : 一個topic可以被切分成多個partitions,一個消費者可以消費多個partitions,但一個partitions只能被一個消費者消費,所以增加partitions可以增加消費者的吞吐量。kafka只保證一個partitions內的訊息是有序的,多個一個partitions之間的資料是無序的。
- 檢視已經創建的主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動生產者和消費者
生產者產生(輸入)資料,消費者消費(輸出)資料
- 啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動後,在命令列下每輸入一些字串按下回車時,就作為一個訊息併發送的kafka
- 啟動消費者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
啟動消費者時,建議另開一個ssh視窗,方便一遍通過生產者命令列輸入訊息,一遍觀察消費者消費的資料
當在生產者下輸入訊息並回車後,在消費者視窗下就能立即看到對應的訊息,這就說明環境搭建成功。
Springboot +kafka
pom
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.2.RELEASE</version> </dependency>
application.properties
# kafka spring.kafka.bootstrap-servers=192.168.85.171:9092 spring.kafka.consumer.group-id=group1 spring.kafka.consumer.auto-offset-reset=earliest
controller
package com.kaicom.nettytest.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; /** * @Author: BillYu * @Description:kafka連線測試 * @Date: Created in 下午3:52 2018/6/4. */ @RestController public class SampleController { private final Logger log = LoggerFactory.getLogger(SampleController.class); @Autowired private KafkaTemplate<String, String> template; @RequestMapping("/send") @ResponseBody String send(String topic, String key, String data) { template.send(topic, key, data); return "success"; } @KafkaListener(id = "test", topics = "test") public void listenTest(ConsumerRecord<?, ?> cr) throws Exception { System.out.println("===>listen"); log.info("{} - {} : {}", cr.topic(), cr.key(), cr.value()); } }
視覺化監控安裝
一、KafkaOffsetMonitor簡述
KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目瞭然。
二、KafkaOffsetMonitor下載
KafkaOffsetMonitor託管在Github上,可以通過Github下載。
下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases
或者下載百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu
github提供下載的jar包應用了google網站提供的js檔案,由於被牆的原因導致web介面無法訪問。可以直接使用別人修改後的jar包,也可以從github上拉取原始碼然後修改打包。
三、KafkaOffsetMonitor啟動
將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來執行這個jar包:
[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days
按回車後,可以看到控制檯輸出:
serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started [email protected]0.0.0.0:8088
如果沒有指定埠,則預設會開啟一個隨機埠。
引數說明:
zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程式埠
refresh :應用程式在資料庫中重新整理和儲存點的頻率
retain :在db中保留多長時間
dbName :儲存的資料庫檔名,預設為offsetapp
為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動指令碼來直接執行,我這裡新建一個名為:kafka-monitor-start.sh的指令碼,然後編輯這個指令碼:
[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8088 \
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;
然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的許可權
[[email protected] KafkaMonitor]# chmod +x kafka-monitor-start.sh
啟動KafkaOffsetMonitor:
[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh &
[1] 6551
[[email protected] KafkaMonitor]# lsof -i:8088
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
四、KafkaOffsetMonitor Web UI
在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:
相關推薦
kafka環境搭建+spring-kafka-demo測試
kafka環境搭建1-單機 kafka是基於scala語言開發,所以需要java執行環境,下載前請先確認是否已經安裝並配置java環境 下載安裝 解壓縮 tar -xzf kafka_2.11-0
mac下kafka環境搭建 測試
kafka介紹:https://blog.csdn.net/see_you_see_me/article/details/78468108 1、安裝工具brew install kafka 會自動安裝依賴zookeeper 2、安裝配置檔案位置 /usr/local/etc/kafka|
windows10下Kafka環境搭建
內容小白,包含JDK+Zookeeper+Kafka三部分。 JDK: 1) 安裝包:Java SE Development Kit 9.0.1 下載地址:http://www.oracle.c
kafka環境搭建
一、使用技術版本 kafka_2.10-0.10.2.1.tar zookeeper-3.4.5.tar 二、環境搭建 因為kafka環境依賴於zookeeper,所以先搭建zookeeper 1、zookeeper搭建 先建立一個資料夾
Kafka學習筆記:Kafka環境搭建
Kafka環境搭建 Kafka單機環境搭建 安裝必需 jdk,這裡使用的是jdk1.8 scala,需要獨立安裝scala,這裡使用的是scala 2.11.8 zookeeper,Kafka會自帶zk,但是最好使用獨立的 安裝步驟 1.將Kafka的tar包上傳
【Kafka】kafka環境搭建及使用
Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計 Kafka將訊息以topic為單位進行歸納。將向Kafka topic釋出訊息的程式成為producers.將預訂topics並消費訊息的程式成為consumer.K
大資料環境搭建之Kafka完全分散式環境搭建步驟詳解
文章目錄 環境準備 解壓安裝 配置檔案 服務啟動 1、啟動分散式叢集的zookeeper 2、啟動Kafka服務 偽分散式搭建完畢之後,只要稍作修改就
大資料環境搭建之Kafka偽分散式環境搭建步驟詳解
文章目錄 Kafka簡介 環境準備 解壓安裝 配置檔案 服務啟動 Topic相關操作 控制檯生產者 控制檯消費者 Kafka簡介
Kafka環境搭建(分散式)
KafKa環境搭建(分散式) 1.上傳kafka_2.11-0.10.0.0.tgz到software下面 2.解壓kafka_2.11-0.10.0.0.tgz(並將kafka安裝包放到別的資料夾中,統一管理。可以不理) &nb
Kafka環境搭建(單機)
KafKa環境搭建(單機) 1.上傳kafka_2.11-0.10.0.0.tgz到software下面 2.解壓kafka_2.11-0.10.0.0.tgz(並將kafka安裝包放到別的資料夾中,統一管理。可以不理) &
kafka環境搭建(Windows/Linux)
(一)安裝zookeeper(windows) kafka需要用到zookeeper,所以需要先安裝zookeeper 1.到官網下載最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/ 2.解壓到你喜歡的
RN 環境搭建 運行demo App
jdk gis 運行時 .org org 正常 and 創建項目 install 1.環境搭建 1.1 JDK 1.2Android JDK 1.3Node npm config set registry h
Python3與OpenCV3.3 圖像處理(一)--環境搭建與簡單DEMO
http opencv3 opencv col lan pytho href tar .net https://blog.csdn.net/qq_32811489/article/details/78636049 https://blog.csdn.net/gangzhu
eclipse Spring環境搭建 spring tool suite
1、期初用intellij社群版,發現收費版才能開發Java EE。 2、使用eclipse按照網上的教程,在help->eclipse marketplace中搜索sts安裝spring工具套件,由於網速太慢(也有可能需FQ),沒成功。 3、下載sts後( http://spring.io/too
Ubuntu 14.04 Caffe和TensorFlow的ARM NN SDK編譯環境搭建及MNIST程式測試
Ubuntu 14.04下Caffe和TensorFlow的ARM NN SDK的aarch64編譯環境搭建及MNIST程式測試 ARM官方測試環境 1. SCons安裝 2.安裝CMake 3.下載安裝boost 4.使用 S
linux下gsoap環境搭建及C++ demo
第一次在Linux下裸寫程式碼,沒有大神可以抱大腿,makefile寫到吐血。 做一個從webservice服務端獲取資料的小東西。以下記錄環境搭建和基礎demo,資料整理彙總,備忘。 基礎概念 WebService:一種跨程式語言和跨作業系統平臺的遠端呼叫技術。
基礎環境搭建好後的測試
如果前一章基礎環境搭建成功,那麼恭喜成功入坑!但彆著急高興因為還有最後一個守門怪等著你去攻克!! 環境搭建好後的測試 一、官方grep案例 二、官方wordcount案例 一、官方grep案例 在hadoop-2.7.2檔案下面建立一
App自動化測試探索(二)MAC環境搭建iOS+Python+Appium測試環境
code -s image ios 使用 usr developer contents gis 環境搭建要求,MAC 機器一臺,要求 Xcode 8.0以上 1. 安裝 Homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw
SFTP伺服器端-freeSSHd環境搭建及java連線測試
因工作中需要使用SFTP上傳下載的功能,所以打算自己搭建一個SFTP伺服器用來測試(幾年前搭建過一次,忘了,現在回顧一下)。 下載下來之後直接選擇目錄安裝即可。 安裝完成開啟設定: 設定完成後啟動server 這樣我們的SFTP伺服器就