1. 程式人生 > >flume+spark streaming+redis完整篇

flume+spark streaming+redis完整篇

一.前言

本篇是用flume作為資料來源,spark streaming來實時處理,然後把結果存在redis供查詢.

本篇介紹的是一個實時統計網站訪問的pv的例子.

本篇採用的各種版本如下 scala-2.10.4   spark-1.6.1  flume-1.6.0

本篇採用的spark叢集為sdandalone模式

二.資料來源flume配置

flume的詳細說明請自行百度.這裡的flume source採用http,並且使用json handler來處理.

source 配置:

a1.sources.r2.type = http
a1.sources.r2.bind=10.8.23.58
a1.sources.r2.port = 5140
a1.sources.r2.channels = c3
a1.sources.r2.handler = org.apache.flume.source.http.JSONHandler

channel因為測試,所以選擇的是記憶體方式,實際根據情況,建議使用flie模式,並且配置checkpoint防止資料丟失.

channel配置:

a1.channels.c3.type = memory
a1.channels.c3.capacity = 100
a1.channels.c3.transactionCapacity = 100

sink有兩種,如下

flume=>spark streaming有兩種方式

1.推模式:這種模式比較簡單,直接連線上對應的avro埠即可,但是有個最大的問題,你必須先啟動spark streaming任務,然後觀察這個埠開在哪臺spark節點上,並且每次啟動都會隨機到某一個節點,然後再去改flume的配置,往那臺機器上發資料,這就比較蛋疼,如果你有100臺flume,那會讓人瘋狂的.因此這個模式,只有你的flume數量比較少的情況下適用.

推模式的sink配置:

a1.sinks.k3.type = avro
a1.sinks.k3.channel = c3
a1.sinks.k3.hostname = 10.8.23.32
a1.sinks.k3.port = 4545

2.拉模式:

這裡建議先使用推模式,等推模式跑通了,再切換到拉模式. 拉模式相比推模式稍微複雜點,主要複雜在flume的配置,官網有很詳細的說明,我這裡會把用到的都描述出來.

首先需要把3個jar放到flume/lib下.(分別是:spark-streaming-flume-sink_2.10-1.6.1.jar ,scala-library-2.10.5.jar,  commons-lang3-3.3.2.jar,在最後的網盤地址裡面有) 

拉模式的sink配置

a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = 10.2.23.58
a1.sinks.spark.port = 4545
a1.sinks.spark.channel = c3

至此 flume配置完成.可以啟動觀察一下.

三.程式碼開發

object  PvStatistic{def main(args: Array[String]): Unit = {


    val masterUrl = "spark://10.8.23.112:7077"
val conf = new SparkConf().setMaster(masterUrl).setAppName("PvStatistic")

val ssc = new StreamingContext(conf, Seconds(15))

//    推模式
//    val flumeStream = FlumeUtils.createStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2)
//    拉模式
val flumeStream = FlumeUtils.createPollingStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2)

        flumeStream.foreachRDD(rdd => {
          rdd.foreachPartition(it=>{
            val jedis = RedisClient.pool.getResource
            it.foreach(event=>{
              val sensorInfo = new String(event.event.getBody.array()) //單行記錄
//              println(sensorInfo)
val json = JSONObject.fromObject(sensorInfo);
val url=json.getString("url")jedis.hincrBy("Spark:PV", url, 1);
})
            RedisClient.pool.returnResource(jedis)
          })
        })

    ssc.start()
    ssc.awaitTermination()
  }


兩種模式對於資料來源的獲取只是方法不一樣,處理邏輯都一樣.

需要的jar包 我直接貼pom檔案

  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.1</version>
      <scope>provided</scope>
  </dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-flume_2.10</artifactId>
  <version>1.6.1</version>
</dependency>
<dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.8.1</version>
  </dependency>
<dependency>
  <groupId>net.sf.json-lib</groupId>
  <artifactId>json-lib</artifactId>
  <version>2.4</version>
  <classifier>jdk15</classifier>
</dependency>

完成以後就是打包上傳到spark主節點了,這裡提一下,因為需要很多依賴包,所以建議直接打成一個包含所有依賴的jar.

推薦一個maven外掛來做這個事.

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>2.3</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
        <transformers>
          <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>com.defonds.RsaEncryptor</mainClass>
          </transformer>
          <transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
            <resource>META-INF/spring.handlers</resource>
          </transformer>
          <transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
            <resource>META-INF/spring.schemas</resource>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

有了這個外掛,直接maven clean package打包就行了,打出來的包包含所有依賴檔案.

然後把這個包上傳到spark主節點,在spark/bin下執行

spark-submit --master spark://cdh112:7077 --class com.dome.PvStatistic  /opt/spark/job/analyseSys-1.0-SNAPSHOT.jar

然後可以向之前配置的flume的接收埠傳送http請求測試了

傳送內容:

[
    {
        "body":"{\"url\": \"http://20160926 16:02\"}",
        "headers":{"v1":"log"}
        }
 ]

在redis裡面可以看到 SparkPv開頭的key下面 每個訪問地址的訪問次數.

上面提到的jar包

連結:http://pan.baidu.com/s/1kUEJJJx 密碼:9fxv

最後感謝一下群內的各位兄弟以及群主aDog~ 有興趣交流的可以加入QQ群:459898801