Flume+Spark Streaming
1 概述
Apache Flume是一種分散式,可靠且可用的服務,用於高效地收集,彙總和移動大量日誌資料。 這裡我們學習如何配置Flume和Spark Streaming來接收來自Flume的資料。 提供兩種方法來解決這問題。
注意:從Spark 2.3.0開始,不推薦使用Flume支援。個人也不推薦這種架構,資料量小的情況下可能沒什麼問題,但是再資料量過大的情況下Streaming流式處理是處理不過來的,必定會造成訊息堵塞,所以建議加一個訊息中介軟體例如Kafka。
2 Push-based
方法1:流水線式推送方式
Flume接收的資料直接交給Spark Streaming處理。 所以這種方法中,Spark Streaming需要一個接收器receiver,作為Flume的Avro代理,Flume可以將資料推送到該接收器。 所以這裡要注意local的使用。
啟動要求:先啟動Spark Streaming進行接收資料再啟動Flume.
選型結構:netcat => memory ==> avro sink ==> streaming
- 編寫程式碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FlumePush {
def main(args: Array[String]): Unit = {
//傳入的引數hostname,port
val Array(hostname,port)=args
val sparkConf=new SparkConf().setAppName ("FlumePush").setMaster("local[2]")
val ssc=new StreamingContext(sparkConf,Seconds(10))
// import FlumeUtils
import org.apache.spark.streaming.flume._
//create input DStream as follows.
val flumeStream=FlumeUtils.createStream(ssc,hostname,port.toInt)
//詞頻統計
flumeStream.map(x =>new String(x.event.getBody.array()).trim)
.flatMap(x=>x.split(",")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
上面我們是通過外面進行傳遞hostname和port,所以要進行下面的配置
在Programe arguments傳遞引數。作者使用的時windows本地執行出現各種錯誤,選擇打包上傳就不需要在本地配置引數的,直接在spark-submit的時候傳遞引數。
- 配置檔案
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動命令
./spark-submit --master local[2] \
--class cn.zhangyu.FlumePush \
--name FlumePush \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414
這時候提交應用程式的時候肯定會報錯,因為我們本地並沒有FlumeUtils這個jar包,所以要指定--packages g:a:v
引數(有網路才可以用,沒有網路可以實現下載好通過–jars引數指定)
./spark-submit --master local[2] \
--class cn.zhangyu.FlumePush \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
--name FlumePush \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-sparkStreaming-push \
-Dflume.root.logger=INFO,console
輸入資料:
[[email protected] conf]$ telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
a,a,a,b,b,b,c,c,c,d,d,s
OK
a,a,a,b,b,b,c,c,c,d,d,s
OK
結果:
(d,2)
(b,3)
(s,1)
(a,3)
(c,3)
(d,2)
(b,3)
(s,1)
(a,3)
(c,3)
3 Pull-based
使用該種方式Flume不是直接將資料推送到Spark Streaming,而是執行一個自定義的Flume接收器,它允許執行以下操作。
- Flume將資料推入接收器,並且資料保持緩衝。
- Spark Streaming使用可靠的Flume接收器和並從接收器中提取資料。該事務只有在Spark Streaming接收和複製資料後才會成功。
注意:
1.這時候並不需要Spark Streaming端的receiver了。
2.啟動順序:先啟動flume在啟動spark streaming。
這確保了比以前的方法更強大的可靠性和容錯保證。 但是,這需要配置Flume執行自定義接收器。 以下是配置步驟。
- flume配置
1.jar包依賴
groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_2.11
version = 2.3.0
groupId = org.scala-lang
artifactId = scala-library
version = 2.11.8
groupId = org.apache.commons
artifactId = commons-lang3
version = 3.5
2.配置檔案
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 啟動
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-sparkStreaming-poll \
-Dflume.root.logger=INFO,console
./spark-submit --master local[2] \
--class cn.zhangyu.FlumePull \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
--name FlumePull \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414
- 輸入
telnet localhost 44444
a,a,a,b,b,b,c,c,c,d,d,s
- 結果:
(d,2)
(b,3)
(s,1)
(a,3)
(c,3)
4 maven中scope屬性的
擴充套件:上面我們說了使用spark-submit提交任務的時候可以使用--packagers
,--jars
但是這並不是一個很好的方法,我們可以使用maven編譯的時候加上需要的類,做法如下:
1.在pom.xml新增
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
Dependency Scope
在POM 4中,dependency中還引入了scope,它主要管理依賴的部署。目前可以使用5個值:
- compile,預設值,適用於所有階段,會隨著專案一起釋出。
- provided,類似compile,期望JDK、容器或使用者會提供這個依賴。如servlet.jar。
- runtime,只在執行時使用,如JDBC驅動,適用執行和測試階段。
- test,只在測試時使用,用於編譯和執行測試程式碼。不會隨專案釋出。
- system,類似provided,需要顯式提供包含依賴的jar,Maven不會在Repository中查詢它。
依賴範圍控制哪些依賴在哪些classpath 中可用,哪些依賴包含在一個應用中。讓我們詳細看一下每一種範圍:
屬性 | 含義 |
---|---|
compile (編譯範圍) | compile是預設的範圍;如果沒有提供一個範圍,那該依賴的範圍就是編譯範圍。編譯範圍依賴在所有的classpath 中可用,同時它們也會被打包。 |
provided (已提供範圍) | provided 依賴只有在當JDK 或者一個容器已提供該依賴之後才使用。例如, 如果你開發了一個web 應用,你可能在編譯 classpath 中需要可用的Servlet API 來編譯一個servlet,但是你不會想要在打包好的WAR 中包含這個Servlet API;這個Servlet API JAR 由你的應用伺服器或者servlet 容器提供。已提供範圍的依賴在編譯classpath (不是執行時)可用。它們不是傳遞性的,也不會被打包。 |
runtime (執行時範圍) | runtime 依賴在執行和測試系統的時候需要,但在編譯的時候不需要。比如,你可能在編譯的時候只需要JDBC API JAR,而只有在執行的時候才需要JDBC驅動實現。 |
test (測試範圍) | test範圍依賴 在一般的編譯和執行時都不需要,它們只有在測試編譯和測試執行階段可用。 |
system (系統範圍) | system範圍依賴與provided 類似,但是你必須顯式的提供一個對於本地系統中JAR 檔案的路徑。這麼做是為了允許基於本地物件編譯,而這些物件是系統類庫的一部分。這樣的構件應該是一直可用的,Maven 也不會在倉庫中去尋找它。如果你將一個依賴範圍設定成系統範圍,你必須同時提供一個 systemPath 元素。注意該範圍是不推薦使用的(你應該一直儘量去從公共或定製的 Maven 倉庫中引用依賴)。 |
所以我們可以在編譯的時候把FlumeUtils這個類加進去:
使用mvn assembly:assembly
命令進行編譯
相關推薦
flume spark streaming配置詳解
Apache Flume是一個用來有效地收集,聚集和移動大量日誌資料的分散式的,有效的服務。這裡我們解釋一下怎樣配置Flume和Spark Streaming來從Flume獲取資料,我們讓
Flume+Spark Streaming
1 概述 Apache Flume是一種分散式,可靠且可用的服務,用於高效地收集,彙總和移動大量日誌資料。 這裡我們學習如何配置Flume和Spark Streaming來接收來自Flume的資料。 提供兩種方法來解決這問題。 注意:從Spark 2.3.0
flume+spark streaming+redis完整篇
一.前言 本篇是用flume作為資料來源,spark streaming來實時處理,然後把結果存在redis供查詢. 本篇介紹的是一個實時統計網站訪問的pv的例子. 本篇採用的各種版本如下 scala-2.10.4 spark-1.6.1 flume-1.6.0 本篇
<Spark Streaming><Flume><Integration>
uri min 取數 nts general ora span int from Overview Flume:一個分布式的,可靠的,可用的服務,用於有效地收集、聚合、移動大規模日誌數據 我們搭建一個flume + Spark Streaming的平臺來從Flume獲取數
下載基於大數據技術推薦系統實戰教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)
大數據技術推薦系統 推薦系統實戰 地址:http://pan.baidu.com/s/1c2tOtwc 密碼:yn2r82課高清完整版,轉一播放碼。互聯網行業是大數據應用最前沿的陣地,目前主流的大數據技術,包括 hadoop,spark等,全部來自於一線互聯網公司。從應用角度講,大數據在互聯網領域主
基於Flume+Kafka+Spark Streaming打造實時流處理項目實戰課程
大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載
Spark學習筆記(15)——Spark Streaming 整合 Flume
1 flume 配置檔案 在 flume-env.sh 裡配置 JAVA_HOME 1.1 flume-pull.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.chan
Spark Streaming整合flume(Poll方式和Push方式)
flume作為日誌實時採集的框架,可以與SparkStreaming實時處理框架進行對接,flume實時產生資料,sparkStreaming做實時處理。 Spark Streaming對接FlumeNG有兩種方式,一種是FlumeNG將訊息Push推給Spark Streaming,還
Spark Streaming實時流處理筆記(6)—— Kafka 和 Flume的整合
1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se
Spark Streaming實時流處理筆記(3)——日誌採集Flume
1 Flume介紹 1.1 設計目標 可靠性 擴充套件性 管理性 1.2 同類產品 Flume: Cloudera/Apache,Java Scribe: Facebook ,C/C++(不維護了) Chukwa: Yahoo
Spark學習(拾叄)- Spark Streaming整合Flume&Kafka
文章目錄 處理流程畫圖剖析 日誌產生器開發並結合log4j完成日誌的輸出 使用Flume採集Log4j產生的日誌 使用KafkaSInk將Flume收集到的資料輸出到Kafka Spark Streaming消費Kafka的
Spark學習(拾壹)- Spark Streaming整合Flume
文章目錄 Push方式整合之概述 Push方式整合之Flume Agent配置開發 Push方式整合之Spark Streaming應用開發 Push方式整合之本地IDEA環境聯調 Push方式整合之伺服器環境聯調
Spark Streaming整合flume實戰
Spark Streaming對接Flume有兩種方式 Poll:Spark Streaming從flume 中拉取資料 Push:Flume將訊息Push推給Spark Streaming 1、安裝flume1.6以上 2、下載依賴包 spark-streaming
spark筆記之Spark Streaming整合flume實戰
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.
spark streaming 學習(和flume結合+和kafka 的結合)
spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val sc = ne
大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰
大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰 一、實時流處理 1.1 實時計算 跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來
Spark Streaming連線Flume的兩種方式
Spark提供了兩種不同的接收器來接受Flume端傳送的資料。 推式接收器該接收器以 Avro 資料池的方式工作,由 Flume 向其中推資料。設定起來非常簡單,我們只需要將Fluem簡單配置下,將資料傳送到Avro資料池中,然後scala提供的Flume
flume+zookeeper+kafka+spark streaming
1.flume安裝部署 1.1、下載安裝介質,並解壓: cd /usr/local/wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gztar -zxvf flume-ng-1.6.0-cdh
基於 Flume+Kafka+Spark Streaming 實現實時監控輸出日誌的報警系統
運用場景:我們機器上每天或者定期都要跑很多工,很多時候任務出現錯誤不能及時發現,導致發現的時候任務已經掛了很久了。 解決方法:基於 Flume+Kafka+Spark Streaming 的框架對這些任務的輸出日誌進行實時監控,當檢測到日誌出現Error的資訊就傳送郵件給
flume+kafka+spark streaming(持續更新)
kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力. Apache