1. 程式人生 > >Spark Streaming使用Flume作為資料來源

Spark Streaming使用Flume作為資料來源

1、Spark Streaming 2.3.1適配 Flume 1.6.0,在Spark Streaming 2.3.0之後對flume的支援已被標記為過時。主要由於flume直接對接Spark Streaming 會造成Spark Streaming壓力過大,特別是高峰期的時候(在之間加一層訊息佇列會好得多)。但由於很多公司可能仍然在用,故簡單做一下介紹。

2、有兩種方式可以讓Spark Streaming整合Flume並從Flume接受資料。以下分別介紹這兩種方式。

一、基於Flume的Push模式(Flume-style Push-based Approach)

    這種方式Spark Streaming會建立一個Receiver,這個Receiver起到一個相當於Flume的Avro Agent的作用,Flume可以將資料推送到這個Receiver。以下是詳細的配置步驟。

1、一般要求

    在你的叢集中選擇一臺機器,這臺機器需要滿足一下條件:

          A.當Spark Streaming+Flume的應用程式執行時,有一個Spark的Worker節點執行在這臺機器上。

          B.Flume通過配置後可以將資料推送到這臺機器的埠上。

2、配置Flume

    通過以下的配置檔案可以將資料傳送到Avro Sink。

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <所選機器的IP>
agent.sinks.avroSink.port = <所選機器的埠>

3、配置Spark Streaming應用程式

A.新增依賴

<dependency>

    <groupId>org.apache.spark </groupId>

    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>${spark.version}</version>

</dependency>

B.在Streaming應用程式的程式碼中,匯入一個FlumeUtils類並建立input DStream。

import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createStream(streamingContext, [所選機器ip], [所選機器埠])

4、測試

A.直接執行程式碼

package com.ruozedata.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePushApp {

  def main(args: Array[String]) {

    val Array(hostname, port) = args

    val sparkConf = new SparkConf()
      .setAppName("FlumePushApp")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val lines = FlumeUtils.createStream(ssc,hostname,port.toInt) 
/*由於createStream返回的DStream型別為SparkFlumeEvent,而不是String,故此時split方法無法使用
*為了能夠使用split,我們執行了以下的map操作
*/
    lines.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }

}

//以上程式碼執行會出錯,因為args沒有傳引數。

傳參流程:右上角FlumePushApp-->Edit Configurations-->program arguments-->填寫滿足條件的引數


接著:


B.打成jar包上傳執行

先啟動spark-submit

./spark-submit --master local[2] \
--class com.ruozedata.streaming.FlumePullApp \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \      //相當於新增依賴,需要叢集能夠訪問外網
--name FlumePushApp \
/home/hadoop/lib/train-scala-1.0.jar \

localhost 41414

再啟動flume

./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/nc-memory-flume.conf \

-Dflume.root.logger=INFO,console

在另一臺客戶端執行 telnet localhost 44444,此時在該客戶端輸入資料,另一客戶端會實時打印出處理結果

//mvn clean packages 打包只包含原始碼不包含依賴包

//以上解決方式需要叢集能夠訪問外網,不能訪問外網時可參照以下步驟

a。把不所有需要打包進來的依賴全部新增<scope>provided</scope>


b。在pom.xml檔案中新增plugin

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>

c。編譯時使用 mvn assembly:assembly來打包

二、基於自定義sink的pull模式Pull-based Approach using a Custom Sink

1、一般要求

 不同於Flume直接將資料推送到Spark Streaming中,這種方法自定義一個滿足以下條件的Flume Sink:

  • Flume將資料推送到sink中,並且資料保持buffered狀態
  • Spark Streaming使用一個可靠的Flume接收器(reliable Flume receiver)和轉換器(transaction)從sink拉取資料.只要當資料被接收並且被Spark Streaming備份後,轉換器才執行成功.
和第一種方式相比,這種方式更可靠,有更好的容錯能力選擇一臺執行在一個Flume agent中的普通sink節點的機器.Flume其他的pipeline配置成向該agent傳送資料.Spark叢集中的機器應該可以訪問到選為sink節點的那臺機器。

2、新增依賴

groupId = org.apache.spark
 artifactId = spark-streaming-flume-sink_2.11
 version = 2.3.1
groupId = org.scala-lang
 artifactId = scala-library
 version = 2.11.8
groupId = org.apache.commons
 artifactId = commons-lang3
 version = 3.5

3、配置flume

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 41414

4、測試

package com.ruozedata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePullApp {

  def main(args: Array[String]) {

    val Array(hostname, port) = args

    val sparkConf = new SparkConf()
      .setAppName("FlumePullApp")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val lines = FlumeUtils.createPollingStream(ssc, hostname,port.toInt)
    lines.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }

}

5、啟動

先啟動Flume

./flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/nc-memory-flume2.conf \

-Dflume.root.logger=INFO,console

然後再另一客戶端啟動 telnet localhost 44444

最後啟動Spark Streaming

./spark-submit --master local[2] \
--class com.ruozedata.streaming.FlumePullApp \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \    
--name FlumePushApp \
/home/hadoop/lib/train-scala-1.0.jar \

localhost 41414

此時在telnet端輸入資料,立馬出處理結果。

第二種方式更好

最後來兩個與題目無關的Spark Streaming小例子

1、黑名單過濾

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ListBuffer

/**
  * Created by ruozedata on 2018/4/15.
  */
object FilterApp {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setAppName("FilterApp")
      .setMaster("local[2]")
    val sc = new SparkContext(sparkConf)

    // (騷馬,true)
    val blackTuple = new ListBuffer[(String,Boolean)]
    blackTuple.append(("sm",true))  //true為了方便join
    val blacksRDD = sc.parallelize(blackTuple)  //將陣列轉成RDD

    // 準備測試資料: log
    val input = new ListBuffer[(String,String)]
    input.append(("su","20180808,su,M,20"))
    input.append(("kk","20180808,kk,M,20"))
    input.append(("sm","20180808,sm,M,20"))
    val inputRDD = sc.parallelize(input)

    // TODO... 過濾掉黑名單使用者
    val joinRDD = inputRDD.leftOuterJoin(blacksRDD) 
    joinRDD.filter(x => {
      x._2._2.getOrElse(false) != true
    }).map(_._2._1).foreach(println)
 //x._2._2為true或none,x._2._1為日誌本身
    sc.stop()
  }

}

2、改造例子1,將其使用Spark Streaming來完成

package com.ruozedata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFilterApp {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf()
      .setAppName("SocketWordCountApp")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    val blacks = List("sm","su")
    val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))

    val lines = ssc.socketTextStream("hadoop000",9997)
    // (su, log of su)
    val clickLogDstream = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
      rdd.leftOuterJoin(blacksRDD)
        .filter(x => {
          // _2 : (string,option)  _2:option
          x._2._2.getOrElse(false) != true
      }).map(_._2._1)
 
//x._2._2為true或none,x._2._1為日誌本身
    })

    clickLogDstream.print()

    ssc.start()
    ssc.awaitTermination()
  }

}