Flume+Kafka+SparkStreaming整合
目錄
1. Flume介紹
Flume是Cloudera提供的一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。
1.1 Flume資料來源以及輸出方式
Flume提供了從console(控制檯)、RPC(Thrift-RPC)、text(檔案)、tail(UNIX tail)、syslog(syslog日誌系統,支援TCP和UDP等2種模式),exec(命令執行)等資料來源上收集資料的能力,在我們的系統中目前使用exec方式進行日誌採集。
Flume的資料接受方,可以是console(控制檯)、text(檔案)、dfs(HDFS檔案)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日誌系統)等。本測試研究中由kafka來接收資料。
1.2 Flume的核心概念
1. Agent:使用JVM執行Flume。每臺機器執行一個agent,但是可以在一個agent中包含多個sources和sinks。
2. Client:生產資料,執行在一個獨立的執行緒。
3. Source:從Client收集資料,傳遞給Channel。
4. Sink:從Channel收集資料,執行在一個獨立執行緒。
5. Channel
6. Events :可以是日誌記錄、 avro物件等。
1.3 結構
Flume以agent為最小的獨立執行單位。一個agent就是一個JVM。單agent由Source、Sink和Channel三大元件構成,如下圖:
Flume提供了大量內建的Source、Channel和Sink型別。不同型別的Source,Channel和Sink可以自由組合。組合方式基於使用者設定的配置檔案,非常靈活。比如:Channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。Sink可以把日誌寫入HDFS, HBase,甚至是另外一個Source等等。Flume支援使用者建立多級流,也就是說,多個agent可以協同工作,並且支援Fan-in、Fan-out、ContextualRouting、Backup Routes。如下圖所示:
1.4 安裝測試
解壓apache-flume-1.6.0-bin.tar.gz:tar –zxvf apache-flume-1.6.0-bin.tar.gz
cp conf/flume-conf.properties.template conf/exec.conf
cp conf/flume-env.sh.template conf/flume-env.sh 配置JAVA_HOME
exec.conf配置如下:
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.channels = c2
a2.sources.r2.command=tail -n +0 -F /usr/local/hadoop/flume/test.log
# Describe the sink
a2.sinks.k2.type = logger
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
驗證安裝:flume-ng version
1.5 啟動flume
flume-ng agent --conf ./flume/conf/ -f ./flume/conf/exec.conf-Dflume.root.logger=DEBUG,console -n a2
傳送資料和flume接收資料:
2.Kafka介紹
2.1 產生背景
Kafka 是分散式釋出-訂閱訊息系統。它最初由 LinkedIn 公司開發,使用 Scala語言編寫,之後成為 Apache 專案的一部分。Kafka是一個分散式的,可劃分的,多訂閱者,冗餘備份的永續性的日誌服務。它主要用於處理活躍的流式資料。
在大資料系統中,常常會碰到一個問題,整個大資料是由各個子系統組成,資料需要在各個子系統中高效能,低延遲的不停流轉。傳統的企業訊息系統並不是非常適合大規模的資料處理。為了已在同時搞定線上應用(訊息)和離線應用(資料檔案,日誌)Kafka 就出現了。Kafka 可以起到兩個作用:
降低系統組網複雜度
降低程式設計複雜度,各個子系統不在是相互協商介面,各個子系統類似插口插在插座上,Kafka 承擔高速資料匯流排的作用。
2.2 部署結構
2.3 叢集架構
2.4 基本概念
Topic:特指 Kafka 處理的訊息源(feeds of messages)的不同分類。
Partition:Topic 物理上的分組,一個topic 可以分為多個 partition,每個 partition 是一個有序的佇列。partition 中的每條訊息都會被分配一個有序的id(offset)。
Message:訊息,是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息。
Producers:訊息和資料生產者,向 Kafka的一個 topic 釋出訊息的過程叫做 producers。
Consumers:訊息和資料消費者,訂閱 topics 並處理其釋出的訊息的過程叫做 consumers。
Broker:快取代理,Kafa 叢集中的一臺或多臺伺服器統稱為broker。
2.5 安裝測試
解壓Kafka: tar -xzf kafka_2.10-0.8.1.1.tgz
啟動ZK bin/zookeeper-server-start.shconfig/zookeeper.properties
啟動服務bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1&
建立主題 bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視主題 bin/kafka-topics.sh --list --zookeeperlocalhost:2181
檢視主題詳情 bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test
刪除主題 bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181
建立生產者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
建立消費者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning
3.Flume和Kafka整合
3.1 兩者整合優勢
Flume更傾向於資料傳輸本身,Kakfa是典型的訊息中介軟體用於解耦生產者消費者。
具體架構上,Agent並沒把資料直接傳送到Kafka,在Kafka前面有層由Flume構成的forward。這樣做有兩個原因:
Kafka的API對非JVM系的語言支援很不友好,forward對外提供更加通用的HTTP介面。
forward層可以做路由、Kafka topic和Kafkapartition key等邏輯,進一步減少Agent端的邏輯。
資料有資料來源到flume再到Kafka時,資料一方面可以同步到HDFS做離線計算,另一方面可以做實時計算。本文實時計算採用SparkStreaming做測試。
3.2 Flume和Kafka整合安裝
提取外掛中的flume-conf.properties檔案:修改如下:flume源採用exec
producer.sources.s.type = exec
producer.sources.s.command=tail -f -n+1/usr/local/Hadoop/flume/test.log
producer.sources.s.channels = c
修改producer代理的topic為test
將配置放到flume/cong/producer.conf中
複製外掛包中的jar包到flume/lib中:刪除掉版本不同的相同jar包,這裡需要刪除scala-compiler-z.9.2.jar包,否則flume啟動會出現問題。
複製kafka/libs中的jar包到flume/lib中。
完整producer.conf:
producer.conf:
#agentsection
producer.sources= s
producer.channels= c
producer.sinks= r
#sourcesection
producer.sources.s.type= exec
#producer.sources.s.spoolDir= /usr/local/hadoop/flume/logs
#producer.sources.s.fileHeader= true
producer.sources.s.command= tail -f -n+1 /usr/local/hadoop/flume/aaa.log
producer.sources.s.channels= c
# Eachsink's type must be defined
producer.sinks.r.type= org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=localhost:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test
#Specifythe channel the sink should use
producer.sinks.r.channel= c
# Eachchannel's type is defined.
producer.channels.c.type= memory
producer.channels.c.capacity= 1000
producer.channels.c.transactionCapacity= 100
3.3 啟動kafka flume相關服務
啟動ZKbin/zookeeper-server-start.sh config/zookeeper.properties
啟動Kafka服務 bin/kafka-server-start.sh config/server.properties
建立消費者bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning
啟動flume
flume-ng agent --conf./flume/conf/ -f ./flume/conf/producer.conf -Dflume.root.logger=DEBUG,console-n producer
向flume傳送資料:
Kafka消費者資料:
3.4 Kafka和SparkStreaming整合
核心程式碼:
完整程式碼路徑:
spark-1.4.0\examples\src\main\java\org\apache\spark\examples\streaming
執行引數:
傳送資料:
由於flume採用exec資料來源的方式,因此flume會監聽配置的相應的檔案: tail -f -n+1 /usr/local/Hadoop/flume/aaa.log
當向該檔案追加檔案時,flume就會獲取追加的資料:
writetoflume.py
flume將獲取的增量資料由sink傳送給kafka,以下是kafka comsumer消費的資料
執行結果:
SparkStreaming訂閱kafka的test主題的資料,將訂閱的資料進行單詞計數處理。
相關推薦
Flume+Kafka+SparkStreaming整合
目錄 1. Flume介紹 Flume是Cloudera提供的一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受
SparkStreaming(14):log4j日誌-flume-kafka-SparkStreaming的整合
一、功能實現 模擬log4j的日誌生產,將日誌輸出到flume伺服器。然後,通過flume將日誌資訊輸出到kafka,進而Streaming可以從kafka獲得日誌,並且進行簡單的處理。 二、步驟 1.目的: 使用log4j將日誌輸按照一定格式輸出,並且傳遞給flume伺服器特定埠接
Flume+Kafka+SparkStreaming+Hbase+可視化(一)
日誌導入 ash channels style 導入 com system ase spark 一、前置準備: Linux命令基礎 Scala、Python其中一門 Hadoop、Spark、Flume、Kafka、Hbase基礎知識 二、分布式日誌收集框架Flume
使用Flume+Kafka+SparkStreaming進行實時日誌分析
每個公司想要進行資料分析或資料探勘,收集日誌、ETL都是第一步的,今天就講一下如何實時地(準實時,每分鐘分析一次)收集日誌,處理日誌,把處理後的記錄存入Hive中,並附上完整實戰程式碼 1. 整體架構 思考一下,正常情況下我們會如何收集並分析日誌呢?
flume +kafka+SparkStreaming日誌監控平臺
流程圖 採集方案#agentsectionproducer.sources= s1producer.channels= c1producer.sinks= k1#配置資料來源producer.sourc
Flume+Kafka+Sparkstreaming日誌分析
最近要做一個日誌實時分析的應用,採用了flume+kafka+sparkstreaming框架,先搞了一個測試Demo,本文沒有分析其架構原理。 簡介:flume是一個分散式,高可靠,可用的海量日誌聚合系統,kafka是一高吞吐量的分散式釋出訂閱系統,s
flume+kafka+storm整合實現實時計算小案例
我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:1.flume收集日誌;2.HDFS輸入路徑儲存日誌;3.MapReduce計算,將結果輸出到HDFS輸出路徑;4.hive+sq
flume+kafka+storm整合00
一、安裝 flume,kafka, storm 的安裝在下面三篇文章: flume:1.6.0 kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大 二、各個部分除錯
Flume 、Kafka 與SparkStreaming 整合程式設計
Flume 、Kafka 與SparkStreaming 整合程式設計 一、 Kafka 與SparkStreaming 整合程式設計 1、程式 pull方式,可靠Recerver ,工作常
SparkStreaming整合Kafka--Direct方式
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.
大資料學習之路96-SparkStreaming整合Kafka
我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。 SparkStreamingContext是不直接提供對Kafka的訪問的。 這個時候就有KafkaUtils 這裡有兩個方法 1.createDirectStream
Kafka 學習筆記(5)—— flume + kafka 整合(1)
1 需求分析 採集訂單系統應用列印的日誌檔案。 日誌檔案使用 log4j 生成,滾動生成。 將採集的日誌檔案儲存到 kafka中。 (source) 輸入: tail -F xx.log
SparkStreaming整合kafka直連模式direct方式
org.apache.spark spark-streaming_2.10 1.6.2 org.apache.spark spark-streaming-kafka_2.10 1.
flume+kafka+storm的整合使用
Flume-ng Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。 不過這裡寫寫自己的見解 這個是flume的架構圖 從上圖可以看到幾個名詞: Agent: 一個Agent包含Source、Channel、Sink和其他的元件
SparkStreaming整合Kafka-0.8的官方文件要點翻譯
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) Note: Kafka 0.8 support is deprecated as of Spark 2.3.0
SparkStreaming整合kafka入門
package kafka import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.seriali
如何將Flume與kafka進行整合
自從Flume1.6開始,新增了對Kafka的支援,極大地提升了Flume的採集能力。避免後端因熱點問題導致kafka的channel爆滿而無法採集資料。 本篇介紹使用Flume當前最新版本1.8與Kafka的結合使用。基本環境Kafka (192.168.156.101:9092)Zookeeper(192
kafka&&sparkstreaming整合入門之Wordcount
/** * @author Mr.lu * @Title: KafkaStreamingWordCount * @ProjectName spark-scala * @Description: TODO * @date 2018/11/
sparkStreaming整合Kafka
這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。 1.新增Kafka的repository 2.DirectKafkaWordCountDemo程式碼展示 3.kafkaProduc
SparkStreaming整合kafka的補充
clas metrics clu head zookeepe 大量 topic 自動重啟 備份 (1)SparkStreaming 整合 kafka 兩種方式對比 Direct 方式的優缺點分析 : 優點: 簡化並行(Simplified Parallelism)。不現