1. 程式人生 > >Flume+Spark Streaming

Flume+Spark Streaming

1 概述

Apache Flume是一種分散式,可靠且可用的服務,用於高效地收集,彙總和移動大量日誌資料。 這裡我們學習如何配置Flume和Spark Streaming來接收來自Flume的資料。 提供兩種方法來解決這問題。

注意:從Spark 2.3.0開始,不推薦使用Flume支援。個人也不推薦這種架構,資料量小的情況下可能沒什麼問題,但是再資料量過大的情況下Streaming流式處理是處理不過來的,必定會造成訊息堵塞,所以建議加一個訊息中介軟體例如Kafka。

2 Push-based

方法1:流水線式推送方式
Flume接收的資料直接交給Spark Streaming處理。 所以這種方法中,Spark Streaming需要一個接收器receiver,作為Flume的Avro代理,Flume可以將資料推送到該接收器。 所以這裡要注意local的使用。

啟動要求:先啟動Spark Streaming進行接收資料再啟動Flume.
選型結構:netcat => memory ==> avro sink ==> streaming

  • 編寫程式碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePush {
  def main(args: Array[String]): Unit = {

    //傳入的引數hostname,port
    val Array(hostname,port)=args
      val sparkConf=new SparkConf().setAppName
("FlumePush").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(10)) // import FlumeUtils import org.apache.spark.streaming.flume._ //create input DStream as follows. val flumeStream=FlumeUtils.createStream(ssc,hostname,port.toInt) //詞頻統計 flumeStream.map(x
=>new String(x.event.getBody.array()).trim) .flatMap(x=>x.split(",")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }

上面我們是通過外面進行傳遞hostname和port,所以要進行下面的配置
這裡寫圖片描述
在Programe arguments傳遞引數。作者使用的時windows本地執行出現各種錯誤,選擇打包上傳就不需要在本地配置引數的,直接在spark-submit的時候傳遞引數。

  • 配置檔案
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動命令

./spark-submit --master local[2] \
--class cn.zhangyu.FlumePush \
--name FlumePush \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414

這時候提交應用程式的時候肯定會報錯,因為我們本地並沒有FlumeUtils這個jar包,所以要指定--packages g:a:v引數(有網路才可以用,沒有網路可以實現下載好通過–jars引數指定)

./spark-submit --master local[2] \
--class cn.zhangyu.FlumePush \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
--name FlumePush \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-sparkStreaming-push \
-Dflume.root.logger=INFO,console

輸入資料:

[[email protected] conf]$ telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
a,a,a,b,b,b,c,c,c,d,d,s
OK
a,a,a,b,b,b,c,c,c,d,d,s
OK

結果:

(d,2)
(b,3)
(s,1)
(a,3)
(c,3)

(d,2)
(b,3)
(s,1)
(a,3)
(c,3)

3 Pull-based

使用該種方式Flume不是直接將資料推送到Spark Streaming,而是執行一個自定義的Flume接收器,它允許執行以下操作。

  1. Flume將資料推入接收器,並且資料保持緩衝。
  2. Spark Streaming使用可靠的Flume接收器和並從接收器中提取資料。該事務只有在Spark Streaming接收和複製資料後才會成功。

注意:
1.這時候並不需要Spark Streaming端的receiver了。
2.啟動順序:先啟動flume在啟動spark streaming。

這確保了比以前的方法更強大的可靠性和容錯保證。 但是,這需要配置Flume執行自定義接收器。 以下是配置步驟。

  • flume配置
    1.jar包依賴
groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_2.11
version = 2.3.0

 groupId = org.scala-lang
 artifactId = scala-library
 version = 2.11.8

 groupId = org.apache.commons
 artifactId = commons-lang3
 version = 3.5

2.配置檔案

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 啟動
./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-sparkStreaming-poll \
-Dflume.root.logger=INFO,console

./spark-submit --master local[2] \
--class cn.zhangyu.FlumePull \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
--name FlumePull \
/home/hadoop/lib/spark_streaming-1.0-SNAPSHOT.jar \
localhost 41414
  • 輸入
telnet localhost 44444

a,a,a,b,b,b,c,c,c,d,d,s
  • 結果:
(d,2)
(b,3)
(s,1)
(a,3)
(c,3)

4 maven中scope屬性的

擴充套件:上面我們說了使用spark-submit提交任務的時候可以使用--packagers,--jars但是這並不是一個很好的方法,我們可以使用maven編譯的時候加上需要的類,做法如下:
1.在pom.xml新增

<plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass></mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>

Dependency Scope

在POM 4中,dependency中還引入了scope,它主要管理依賴的部署。目前可以使用5個值:

  • compile,預設值,適用於所有階段,會隨著專案一起釋出。
  • provided,類似compile,期望JDK、容器或使用者會提供這個依賴。如servlet.jar。
  • runtime,只在執行時使用,如JDBC驅動,適用執行和測試階段。
  • test,只在測試時使用,用於編譯和執行測試程式碼。不會隨專案釋出。
  • system,類似provided,需要顯式提供包含依賴的jar,Maven不會在Repository中查詢它。

依賴範圍控制哪些依賴在哪些classpath 中可用,哪些依賴包含在一個應用中。讓我們詳細看一下每一種範圍:

屬性 含義
compile (編譯範圍) compile是預設的範圍;如果沒有提供一個範圍,那該依賴的範圍就是編譯範圍。編譯範圍依賴在所有的classpath 中可用,同時它們也會被打包。
provided (已提供範圍) provided 依賴只有在當JDK 或者一個容器已提供該依賴之後才使用。例如, 如果你開發了一個web 應用,你可能在編譯 classpath 中需要可用的Servlet API 來編譯一個servlet,但是你不會想要在打包好的WAR 中包含這個Servlet API;這個Servlet API JAR 由你的應用伺服器或者servlet 容器提供。已提供範圍的依賴在編譯classpath (不是執行時)可用。它們不是傳遞性的,也不會被打包。
runtime (執行時範圍) runtime 依賴在執行和測試系統的時候需要,但在編譯的時候不需要。比如,你可能在編譯的時候只需要JDBC API JAR,而只有在執行的時候才需要JDBC驅動實現。
test (測試範圍) test範圍依賴 在一般的編譯和執行時都不需要,它們只有在測試編譯和測試執行階段可用。
system (系統範圍) system範圍依賴與provided 類似,但是你必須顯式的提供一個對於本地系統中JAR 檔案的路徑。這麼做是為了允許基於本地物件編譯,而這些物件是系統類庫的一部分。這樣的構件應該是一直可用的,Maven 也不會在倉庫中去尋找它。如果你將一個依賴範圍設定成系統範圍,你必須同時提供一個 systemPath 元素。注意該範圍是不推薦使用的(你應該一直儘量去從公共或定製的 Maven 倉庫中引用依賴)。

所以我們可以在編譯的時候把FlumeUtils這個類加進去:
這裡寫圖片描述
使用mvn assembly:assembly命令進行編譯

相關推薦

flume spark streaming配置詳解

      Apache Flume是一個用來有效地收集,聚集和移動大量日誌資料的分散式的,有效的服務。這裡我們解釋一下怎樣配置Flume和Spark Streaming來從Flume獲取資料,我們讓

Flume+Spark Streaming

1 概述 Apache Flume是一種分散式,可靠且可用的服務,用於高效地收集,彙總和移動大量日誌資料。 這裡我們學習如何配置Flume和Spark Streaming來接收來自Flume的資料。 提供兩種方法來解決這問題。 注意:從Spark 2.3.0

flume+spark streaming+redis完整篇

一.前言 本篇是用flume作為資料來源,spark streaming來實時處理,然後把結果存在redis供查詢. 本篇介紹的是一個實時統計網站訪問的pv的例子. 本篇採用的各種版本如下 scala-2.10.4   spark-1.6.1  flume-1.6.0 本篇

<Spark Streaming><Flume><Integration>

uri min 取數 nts general ora span int from Overview Flume:一個分布式的,可靠的,可用的服務,用於有效地收集、聚合、移動大規模日誌數據 我們搭建一個flume + Spark Streaming的平臺來從Flume獲取數

下載基於大數據技術推薦系統實戰教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)

大數據技術推薦系統 推薦系統實戰 地址:http://pan.baidu.com/s/1c2tOtwc 密碼:yn2r82課高清完整版,轉一播放碼。互聯網行業是大數據應用最前沿的陣地,目前主流的大數據技術,包括 hadoop,spark等,全部來自於一線互聯網公司。從應用角度講,大數據在互聯網領域主

基於Flume+Kafka+Spark Streaming打造實時流處理項目實戰課程

大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載

Spark學習筆記(15)——Spark Streaming 整合 Flume

1 flume 配置檔案 在 flume-env.sh 裡配置 JAVA_HOME 1.1 flume-pull.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.chan

Spark Streaming整合flume(Poll方式和Push方式)

flume作為日誌實時採集的框架,可以與SparkStreaming實時處理框架進行對接,flume實時產生資料,sparkStreaming做實時處理。 Spark Streaming對接FlumeNG有兩種方式,一種是FlumeNG將訊息Push推給Spark Streaming,還

Spark Streaming實時流處理筆記(6)—— Kafka 和 Flume的整合

1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se

Spark Streaming實時流處理筆記(3)——日誌採集Flume

1 Flume介紹 1.1 設計目標 可靠性 擴充套件性 管理性 1.2 同類產品 Flume: Cloudera/Apache,Java Scribe: Facebook ,C/C++(不維護了) Chukwa: Yahoo

Spark學習(拾叄)- Spark Streaming整合Flume&Kafka

文章目錄 處理流程畫圖剖析 日誌產生器開發並結合log4j完成日誌的輸出 使用Flume採集Log4j產生的日誌 使用KafkaSInk將Flume收集到的資料輸出到Kafka Spark Streaming消費Kafka的

Spark學習(拾壹)- Spark Streaming整合Flume

文章目錄 Push方式整合之概述 Push方式整合之Flume Agent配置開發 Push方式整合之Spark Streaming應用開發 Push方式整合之本地IDEA環境聯調 Push方式整合之伺服器環境聯調

Spark Streaming整合flume實戰

Spark Streaming對接Flume有兩種方式 Poll:Spark Streaming從flume 中拉取資料 Push:Flume將訊息Push推給Spark Streaming 1、安裝flume1.6以上 2、下載依賴包 spark-streaming

spark筆記之Spark Streaming整合flume實戰

a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.

spark streaming 學習(和flume結合+和kafka 的結合)

spark 2.1 設定日誌級別很簡單 下面幾行程式碼就可以搞定 主要是下面畫橫線的程式碼val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") val sc = ne

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰 一、實時流處理 1.1 實時計算 跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來

Spark Streaming連線Flume的兩種方式

Spark提供了兩種不同的接收器來接受Flume端傳送的資料。    推式接收器該接收器以 Avro 資料池的方式工作,由 Flume 向其中推資料。設定起來非常簡單,我們只需要將Fluem簡單配置下,將資料傳送到Avro資料池中,然後scala提供的Flume

flume+zookeeper+kafka+spark streaming

1.flume安裝部署 1.1、下載安裝介質,並解壓: cd /usr/local/wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gztar -zxvf flume-ng-1.6.0-cdh

基於 Flume+Kafka+Spark Streaming 實現實時監控輸出日誌的報警系統

運用場景:我們機器上每天或者定期都要跑很多工,很多時候任務出現錯誤不能及時發現,導致發現的時候任務已經掛了很久了。  解決方法:基於 Flume+Kafka+Spark Streaming 的框架對這些任務的輸出日誌進行實時監控,當檢測到日誌出現Error的資訊就傳送郵件給

flume+kafka+spark streaming(持續更新)

kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力. Apache