Spark Streaming從Flume Poll資料案例實戰和內幕原始碼解密
本博文內容主要包括以下幾點內容:
1、Spark Streaming on Polling from Flume實戰
2、Spark Streaming on Polling from Flume原始碼
一、推模式(Flume push SparkStreaming)與拉模式(SparkStreaming poll Flume)比較 :
採用推模式:推模式的理解就是Flume作為快取,存有資料。監聽對應埠,如果服務可以連結,就將資料push過去。(簡單,耦合要低),缺點是SparkStreaming 程式沒有啟動的話,Flume端會報錯,同時可能會導致Spark Streaming 程式來不及消費的情況。
採用拉模式:拉模式就是自己定義一個sink,SparkStreaming自己去channel裡面取資料,根據自身條件去獲取資料,穩定性好。
二、Flume poll 實戰:
1.Flume poll 配置
2、將下載後的三個jar包放入Flume安裝lib目錄:
3、配置Flume conf環境引數:
首先進入此入境
接下來在此檔案中的sink1中新增此內容:
agent1.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.sink1.hostname = Master
agent1.sinks .sink1.port = 9999
agent1.sinks.sink1.channel = channel1
三、編寫程式碼:
public class SkarkStreamingPollDataFromFlume {
public static void main(String[] args) {
/*
* 第一步:配置SparkConf:
* 1,至少2條執行緒:因為Spark Streaming應用程式在執行的時候,至少有一條
* 執行緒用於不斷的迴圈接收資料,並且至少有一條執行緒用於處理接受的資料(否則的話無法
* 有執行緒用於處理資料,隨著時間的推移,記憶體和磁碟都會不堪重負);
* 2,對於叢集而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的
* 應用程式而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的
* Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等);
*/
SparkConf conf = new SparkConf().setAppName("SparkStreamingPollDataFromFlume").setMaster("local[2]");
/*
* 第二步:建立SparkStreamingContext:
* 1,這個是SparkStreaming應用程式所有功能的起始點和程式排程的核心
* SparkStreamingContext的構建可以基於SparkConf引數,也可基於持久化的SparkStreamingContext的內容
* 來恢復過來(典型的場景是Driver崩潰後重新啟動,由於Spark Streaming具有連續7*24小時不間斷執行的特徵,
* 所有需要在Driver重新啟動後繼續上衣系的狀態,此時的狀態恢復需要基於曾經的Checkpoint);
* 2,在一個Spark Streaming應用程式中可以建立若干個SparkStreamingContext物件,使用下一個SparkStreamingContext
* 之前需要把前面正在執行的SparkStreamingContext物件關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是
* Spark Core上的一個應用程式而已,只不過Spark Streaming框架箱執行的話需要Spark工程師寫業務邏輯處理程式碼;
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(30));
/*
* 第三步:建立Spark Streaming輸入資料來源input Stream:
* 1,資料輸入來源可以基於File、HDFS、Flume、Kafka、Socket等
* 2, 在這裡我們指定資料來源於網路Socket埠,Spark Streaming連線上該埠並在執行的時候一直監聽該埠
* 的資料(當然該埠服務首先必須存在),並且在後續會根據業務需要不斷的有資料產生(當然對於Spark Streaming
* 應用程式的執行而言,有無資料其處理流程都是一樣的);
* 3,如果經常在每間隔5秒鐘沒有資料的話不斷的啟動空的Job其實是會造成排程資源的浪費,因為並沒有資料需要發生計算,所以
* 例項的企業級生成環境的程式碼在具體提交Job前會判斷是否有資料,如果沒有的話就不再提交Job;
*/
JavaReceiverInputDStream lines = FlumeUtils.createPollingStream(jsc, "Master", 9999);
/*
* 第四步:接下來就像對於RDD程式設計一樣基於DStream進行程式設計!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體
* 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!!
*對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.1步:講每一行的字串拆分成單個的單詞
*/
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
@Override
public Iterable<String> call(SparkFlumeEvent event) throws Exception {
String line = new String(event.event().getBody().array());
return Arrays.asList(line.split(" "));
}
});
/*
* 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.2步:在單詞拆分的基礎上對每個單詞例項計數為1,也就是word => (word, 1)
*/
JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String,Integer>(word,1);
}
});
/*
* 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函式等的程式設計,來進行具體的資料計算
* 第4.3步:在每個單詞例項計數為1基礎之上統計每個單詞在檔案中出現的總次數
*/
JavaPairDStream<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
/*
* 此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming
* 而言具體是否觸發真正的Job執行是基於設定的Duration時間間隔的
* 諸位一定要注意的是Spark Streaming應用程式要想執行具體的Job,對Dtream就必須有output Stream操作,
* output Stream有很多型別的函式觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個
* 方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD
* 主要就是用用來完成這些功能的,而且可以隨意的自定義具體資料到底放在哪裡!!!
*
*/
wordsCount.print();
/*
* Spark Streaming執行引擎也就是Driver開始執行,Driver啟動的時候是位於一條新的執行緒中的,當然其內部有訊息迴圈體,用於
* 接受應用程式本身或者Executor中的訊息;
*/
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
啟動HDFS叢集:
啟動執行Flume:
啟動eclipse下的應用程式:
copy測試檔案hellospark.txt到Flume flume-conf.properties配置檔案中指定的/usr/local/flume/tmp/TestDir目錄下:
隔24秒後可以在eclipse程式控制臺中看到上傳的檔案單詞統計結果。
四:原始碼分析:
1、建立createPollingStream (FlumeUtils.scala ):
2、引數配置:預設的全域性引數,private 級別配置無法修改:
3、建立FlumePollingInputDstream物件
4、繼承自ReceiverInputDstream並覆寫getReciver方法,呼叫FlumePollingReciver介面:
5、ReceiverInputDstream 構建了一個執行緒池,設定為後臺執行緒;並使用lazy和工廠方法建立執行緒和NioClientSocket(NioClientSocket底層使用NettyServer的方式)
6、receiverExecutor 內部也是執行緒池;connections是指連結分散式Flume叢集的FlumeConnection實體控制代碼的個數,執行緒拿到實體控制代碼訪問資料。
7、啟動時建立NettyTransceiver,根據並行度(預設5個)迴圈提交FlumeBatchFetcher
8、FlumeBatchFetcher run方法中從Receiver中獲取connection連結控制代碼ack跟訊息確認有關
9、獲取一批一批資料方法
補充說明:
使用Spark Streaming可以處理各種資料來源型別,如:資料庫、HDFS,伺服器log日誌、網路流,其強大超越了你想象不到的場景,只是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不瞭解。
博文內容源自DT大資料夢工廠Spark課程。相關課程內容視訊可以參考:
百度網盤連結:http://pan.baidu.com/s/1slvODe1(如果連結失效或需要後續的更多資源,請聯絡QQ460507491或者微訊號:DT1219477246 獲取上述資料)。
相關推薦
Spark Streaming從Flume Poll資料案例實戰和內幕原始碼解密
本博文內容主要包括以下幾點內容: 1、Spark Streaming on Polling from Flume實戰 2、Spark Streaming on Polling from Flume原始碼 一、推模式(Flume push SparkStre
大資料IMF傳奇行動絕密課程第87課:Flume推送資料到Spark Streaming案例實戰和內幕原始碼解密
Flume推送資料到Spark Streaming案例實戰和內幕原始碼解密 1、Flume on HDFS案例回顧 2、Flume推送資料到Spark Streaming實戰 3、原理繪圖剖析 一、配置.bashrc vi ~/.bashrc
第87課:Flume推送資料到SparkStreaming案例實戰和內幕原始碼解密--flume安裝篇
1、 下載flume 老師提供的包 2、 安裝 vi/etc/profile exportFLUME_HOME=/usr/local/apache-flume-1.6.0-bin exportPATH=.:$PATH:$JAVA_HOME/bin:$HADOOP
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本
第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 /* * *王家林老師授課http://weibo.com/ilovepains */ 每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS
Spark Streaming整合flume(Poll方式和Push方式)
flume作為日誌實時採集的框架,可以與SparkStreaming實時處理框架進行對接,flume實時產生資料,sparkStreaming做實時處理。 Spark Streaming對接FlumeNG有兩種方式,一種是FlumeNG將訊息Push推給Spark Streaming,還
0073 spark streaming從埠接受資料進行實時處理的方法
一,環境 Windows_x64 系統 Java1.8 Scala2.10.6 spark1.6.0 hadoop2.7.5 IDEA IntelliJ 2017.2 nmap工具(用到其中的nc
SparkStreaming 從Flume Poll資料
1.官網資料 2.需要下載相關依賴到flume的lib中 3.配置flume的配置檔案 #agent1表示代理名稱 agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1 #配置source
Spark Streaming整合flume實戰
Spark Streaming對接Flume有兩種方式 Poll:Spark Streaming從flume 中拉取資料 Push:Flume將訊息Push推給Spark Streaming 1、安裝flume1.6以上 2、下載依賴包 spark-streaming
spark筆記之Spark Streaming整合flume實戰
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.
spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐
最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("
Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數
1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or
Spark Streaming從Kafka中獲取數據,並進行實時單詞統計,統計URL出現的次數
scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka
spark streaming從指定offset處消費Kafka數據
tpc asi 4.2 nes 配置 sof 我們 erl examples spark streaming從指定offset處消費Kafka數據 2017-06-13 15:19 770人閱讀 評論(2) 收藏 舉報 分類: spark(5) 原文地址:htt
PK2227-Spark Streaming實時流處理項目實戰
con ans filesize strip for 新年 感覺 post pre PK2227-Spark Streaming實時流處理項目實戰 新年伊始,學習要趁早,點滴記錄,學習就是進步! 隨筆背景:在很多時候,很多入門不久的朋友都會問我:我是從其他語言轉到程序
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記三之銘文升級版
聚集 配置文件 ssi path fig rect 擴展 str 控制臺 銘文一級: Flume概述Flume is a distributed, reliable, and available service for efficiently collecting(收集),
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記五之銘文升級版
環境變量 local server 節點數 replicas conn 配置環境 park 所有 銘文一級: 單節點單broker的部署及使用 $KAFKA_HOME/config/server.propertiesbroker.id=0listenershost.name
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記九之銘文升級版
file sin ssi 右上角 result map tap 核心 內容 銘文一級: 核心概念:StreamingContext def this(sparkContext: SparkContext, batchDuration: Duration) = { th
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十之銘文升級版
state 分鐘 mooc 系統數據 使用 連接 var style stream 銘文一級: 第八章:Spark Streaming進階與案例實戰 updateStateByKey算子需求:統計到目前為止累積出現的單詞的個數(需要保持住以前的狀態) java.lang.I
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十五之銘文升級版
spa for 序列 html art mat div pre paths 銘文一級:[木有筆記] 銘文二級: 第12章 Spark Streaming項目實戰 行為日誌分析: 1.訪問量的統計 2.網站黏性 3.推薦 Python實時產生數據 訪問URL->IP
【慕課網實戰】Spark Streaming實時流處理項目實戰筆記十六之銘文升級版
.so zook orm 3.1 date nta highlight org 結果 銘文一級: linux crontab 網站:http://tool.lu/crontab 每一分鐘執行一次的crontab表達式: */1 * * * * crontab -e */1