1. 程式人生 > >SparkSQL結合SparkStreaming,使用SQL完成實時計算中的資料統計

SparkSQL結合SparkStreaming,使用SQL完成實時計算中的資料統計

關鍵字:SparkSQL、Spark Streaming、SQL、實時計算

Flume+Kafka+SparkStreaming已經發展為一個比較成熟的實時日誌收集與計算架構,利用Kafka,即可以支援將用於離線分析的資料流到HDFS,又可以同時支撐多個消費者實時消費資料,包括SparkStreaming。然而,在SparkStreaming程式中如果有複雜業務邏輯的統計,使用scala程式碼實現起來比較困難,也不易於別人理解。但如果在SparkSteaming中也使用SQL來做統計分析,是不是就簡單的多呢?

本文介紹將SparkSQL與SparkStreaming結合起來,使用SQL完成實時的日誌資料統計。SparkStreaming程式以yarn-cluster模式執行在YARN上,不單獨部署Spark叢集。

環境部署

Hadoop-2.3.0-cdh5.0.0(YARN)

spark-1.5.0-bin-hadoop2.3

kafka_2.10-0.8.2.1

另外,還編譯了SparkStreaming用於讀取Kafka資料的外掛:

spark-streaming-kafka_2.10-1.5.0.jar

相關環境的部署本文不做介紹,請參考文章最後的相關閱讀。

實時統計需求

以60秒為間隔,統計60秒內的pv,ip數,uv

最終結果包括:

時間點:pv:ips:uv

原始日誌格式

2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E
2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9

每條日誌包含7個欄位,分隔符為|~|,其中,第3列為ip,第7列為cookieid。假設原始日誌已經由Flume流到Kafka中。

SparkStreaming程式程式碼

程式中使用下面的SQL語句完成對一個批次的資料統計:

SELECT date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') AS time,
COUNT(1) AS pv,
COUNT(DISTINCT ip) AS ips,
COUNT(DISTINCT cookieid) as uv 
FROM daplog

SparkStreaming程式程式碼:

package com.lxw.test

import scala.reflect.runtime.universe

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.kafka.KafkaUtils
/**
 * auth:lxw1234
 * http://lxw1234.com
 */
object DapLogStreaming {
  
  def main (args : Array[String]) {
    val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("DapLogStreaming")
    //每60秒一個批次
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    //從Kafka中讀取資料,topic為daplog,該topic包含兩個分割槽
    val kafkaStream = KafkaUtils.createStream(
          ssc, 
          "bj11-65:2181", //Kafka叢集使用的zookeeper
          "group_spark_streaming", //該消費者使用的group.id
          Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日誌在Kafka中的topic及其分割槽
          StorageLevel.MEMORY_AND_DISK_SER)
      .map(x => x._2.split("\\|~\\|", -1))  //日誌以|~|為分隔符
    
    kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      //構造case class: DapLog,提取日誌中相應的欄位
      val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF()
      //註冊為tempTable
      logDataFrame.registerTempTable("daplog")
      //查詢該批次的pv,ip數,uv
      val logCountsDataFrame =
        sqlContext.sql("select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog")
      //列印查詢結果
      logCountsDataFrame.show()
    })
    
    
    ssc.start()
    ssc.awaitTermination()
    
  }
  
  
}

case class DapLog(day:String, ip:String, cookieid:String)

object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

示例中只是將實時統計的結果列印到標準輸出,真實場景一般是將結果持久化到資料庫中。

將該程式打包成DapLogStreaming.jar,上傳至閘道器機。

執行SparkStreaming程式

進入$SPARK_HOME/bin執行下面的命令,將SparkStreaming程式提交到YARN:

./spark-submit \
--class com.lxw.test.DapLogStreaming \
--master yarn-cluster \
--executor-memory 2G \
--num-executors 6 \
--jars /home/liuxiaowen/kafka-clients-0.8.2.1.jar,/home/liuxiaowen/metrics-core-2.2.0.jar,/home/liuxiaowen/zkclient-0.3.jar,/home/liuxiaowen/spark-streaming-kafka_2.10-1.5.0.jar,/home/liuxiaowen/kafka_2.10-0.8.2.1.jar \
/home/liuxiaowen/DapLogStreaming.jar

注意:SparkStreaming及Kafka外掛執行時候需要依賴相應的jar包。

檢視執行結果

進入YARN ResourceManager的WEB介面,找到該程式對應的Application,點選ApplicationMaster的連結,進入SparkMaster介面:

每個批次(60秒),會生成一個Job。

點選TAB頁”Streaming”,進入Streaming的監控頁面:

在最下方,顯示正在處理的批次和已經完成的批次,包括每個批次的events數量。

最後,最主要的,點選ApplicationMaster的logs連結,檢視stdout標準輸出:

已經按照SQL中統計的欄位,打印出統計結果,每60秒一個批次列印一次。

注意事項

由於kafka_2.10-0.8.2.1是基於Scala2.10的,因此Spark、Spark的Kafka外掛、SparkStreaming應用程式都需要使用Scala2.10,如果使用Scala2.11,執行時候會報出因Scala版本不一致而造成的錯誤,比如:

15/11/11 15:36:26 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
	at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:59)
	at com.lxw.test.DapLogStreaming$.main(DapLogStreaming.scala:23)
	at com.lxw.test.DapLogStreaming.main(DapLogStreaming.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.sc