kafka與flume 的應用(實戰)
版本號:
RedHat6.5 JDK1.8 flume-1.6.0 kafka_2.11-0.8.2.11.flume安裝
2.kafka安裝
3.Flume和Kafka整合
在conf目錄新建flume-kafka.conf檔案:- touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
- sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
- # 指定Agent的元件名稱
- agent1.sources = source1
- agent1.sinks
- agent1.channels = channel1
- # 指定Flume source(要監聽的路徑)
- agent1.sources.source1.type = spooldir
- agent1.sources.source1.spoolDir =/usr/local/flume/logtest
- # 指定Flume sink
- #agent1.sinks.sink1.type = logger
- agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
- agent1.sinks.sink1.topic = test
- agent1.sinks.sink1.brokerList =192.168.168.200:9092
- agent1.sinks.sink1.requiredAcks =1
- agent1.sinks.sink1.batchSize =100
- # 指定Flume channel
- agent1.channels.channel1.type = memory
- agent1.channels.channel1.capacity =1000
- agent1.channels.channel1.transactionCapacity =100
- # 繫結source和sink到channel上
- agent1.sources.source1
- agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.topic = test 代表flume監聽路徑下發生變化時,會把訊息傳送到localhost機器上的test主題。
啟動flume-kafka.conf:
- cd /usr/local/flume/apache-flume-1.6.0-bin
- bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console
執行成功日誌如下:
- 2017-07-0722:22:02,270(lifecycleSupervisor-1-2)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitored counter groupfor type: SINK, name: sink1:Successfully registered newMBean.
- 2017-07-0722:22:02,270(lifecycleSupervisor-1-2)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Component type: SINK, name: sink1 started
啟動kafka的消費者,監聽topic主題:
- kafka-console-consumer.sh --zookeeper localhost:2181--topic test
testKafka.log :
在/usr/local/flume目錄下面新建一個testKafka.log日誌檔案,寫入Flume connect Kafka success! 作為測試內容:- touch /usr/local/flume/testKafka.log
- sudo gedit /usr/local/flume/testKafka.log
- cp /usr/local/flume/testKafka.log /usr/local/flume/logtest
- [[email protected] kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181--topic test
- [2017-07-0722:36:38,687] INFO [GroupMetadataManager on Broker200]:Removed0 expired offsets in1 milliseconds.(kafka.coordinator.GroupMetadataManager)
- Flume connect Kafka success!
---------------------------------flume------------------------------
- 2017-07-0722:41:32,602(pool-3-thread-1)[INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)]Preparing to move file /usr/local/flume/logtest/testKafka.log to /usr/local/flume/logtest/testKafka.log.COMPLETED
- 2017-07-0722:41:35,669(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Fetching metadata from broker id:0,host:localhost,port:9092with correlation id 0for1 topic(s)Set(test)
- 2017-07-0722:41:35,728(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Connected to localhost:9092for producing
- 2017-07-0722:41:35,757(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Disconnectingfrom localhost:9092
- 2017-07-0722:41:35,791(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala
相關推薦
kafka與flume 的應用(實戰)
版本號:RedHat6.5 JDK1.8 flume-1.6.0 kafka_2.11-0.8.2.11.flume安裝2.kafka安裝3.Flume和Kafka整合在conf目錄新建flume-kafka.conf檔案:touch /usr/local/fl
項目實戰-大數據Kafka原理剖析及(實戰)演練
實戰 kafka 大數據 com nbsp attach forum ignore spa Kafka原理剖析及實戰演練 Kafka理論+實戰視頻教程 Kafka完美入門視頻教程 煉數成金<ignore_js_op> <ignore_js_op> &
Spring Boot Actuator詳解與深入應用(一):Actuator 1.x
《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指
Spring Boot Actuator詳解與深入應用(二):Actuator 2.x
《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指
Spring Boot Actuator詳解與深入應用(三):Prometheus+Grafana應用監控
《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指
Kafka與.net core(一)安裝
1.安裝JDK 目前官網不能直接下載,在網上找到1.8.0版本安裝包下載到本地。 1.1.下載jdk並解壓 [[email protected] java]# ls jdk1.8.0_191 jdk-8u191-linux-x64.tar.gz 1.2.配置java環境變數
Kafka與.net core(三)kafka操作
1.Kafka相關知識 Broker:即Kafka的伺服器,使用者儲存訊息,Kafa叢集中的一臺或多臺伺服器統稱為broker。 Message訊息:是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息。
對於 RxJava2 的 認知與直接應用(一)
最近有時間學習些許內容,加上重新寫部落格來記錄自己的學習過程與心得 1.rxjava2 認知 rxjava作為知名的響應式程式設計庫,這半年內極大的火爆開發者中 介紹 Observable 被觀察者 | | subscribe(
mongodb專案實戰與高階應用(MongoDB 高可用方案-MongoDB 副本集搭建)
MongoDB 副本集 中文翻譯叫做副本集,不過我並不喜歡把英文翻譯成中文,總是感覺怪怪的。其實簡單來說就是集 群當中包含了多份資料,保證主節點掛掉了,備節點能繼續提供資料服務,提供的前提就是資料需要和 主節點一致。 Mongodb(M)表示主節點,Mongodb(S)表示備節點,Mon
mongodb專案實戰與高階應用(MongoDB 高可用方案-主從搭建)
1、命令列啟動 $ ./mongod --fork --dbpath=/opt/mongodb/data 2、配置檔案啟動 $ ./mongod -f mongodb.cfg mongoDB 基本配置/opt/mongodb/mongodb.cfg dbpa
mongodb專案實戰與高階應用(使用者管理)
1.1、新增使用者 為testdb 新增ma 使用者 use testdb db.createUser({user:"ma",pwd:"123",roles:[{ role:"dbAdmin",db:"testdb"}]}) 具體角色有 read:允許使用者讀取指定資料庫 readWri
大資料系列之分散式釋出訂閱訊息系統Kafka(四)Kafka與Flume的3種整合
前面我們已經介紹了Flume,現在我們將Kafka與Flume整合 先看一下Flume的結構組成: 我們可以發現,將Flume與Kafka進行整合無非3種情況,Flume作為生產者——Sink輸出到Kafka,Flume作為消費者——Source接
Linux同步與相互排斥應用(零):基礎概念
使用 line 關系 並發執行 來看 文章 必須 生產者 而且 【版權聲明:尊重原創,轉載請保留出處:blog.csdn.net/shallnet 或 .../gentleliu,文章僅供學習交流,請勿用於商業用途】 當操作系統進入多道批處理
TF-IDF與余弦相似性的應用(三):自動摘要
下一步 dip target 似的 abs tps .net ebo ace 轉:http://www.ruanyifeng.com/blog/2013/03/automatic_summarization.html 有時候,很簡單的數學方法,就可以完成很復雜的任務。 這個
第44課 遞歸的思想與應用(中)
!= 遞歸法 ati 恢復 直接 clu spa tex height 1. 單向鏈表的轉置 【編程實驗】單向鏈表的轉置(Node* reverse(Node* list)) 2. 單向排序鏈表的合並 【編程實驗】單向排序鏈表的合並(Node* merge(Node
web應用與web框架(Day65)
pos ack ++ 環境 lex roo http請求 main conn Web應用 對於所有的web應用,本質上其實就是一個socket服務端,用戶的瀏覽器其實就是一個socket客戶端 import socket def handle_request(clien
kafka源碼分析(二)Metadata的數據結構與讀取、更新策略
思路 sync 源碼分析 png ada ret code 入隊 後臺線程 一、基本思路 異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然後一個後臺線程Sender不斷循環,把消息發給K
由散列表到BitMap的概念與應用(三):面試中的海量資料處理
一道面試題 在面試軟體開發工程師時,經常會遇到海量資料排序和去重的面試題,特別是大資料崗位。 例1:給定a、b兩個檔案,各存放50億個url,每個url各佔64位元組,記憶體限制是4G,找出a、b檔案共同的url? 首先我們最常想到的方法是讀取檔案a,建立雜湊表,然後再讀取檔案b,遍歷檔
IO的應用(二)--序列化與反序列化
package com.bjpowernode.demo02; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import ja
VB6基本資料庫應用(三):連線資料庫與SQL語句的Select語句初步
資料庫我們已經建好了,重提一下上一章的結果,我們最後建立了一張Student的表,其中有StudentID(數字的雙精度型別)和StudentName(文字型別。補充一下,2013中有【長文字】和【短文字】,人名不會很長,根據上一章選擇儘量小的資料型別的規則,這裡就選【短文字】就可以了)。儘