1. 程式人生 > >spark例子整理

spark例子整理

Spark Streaming是一個準實時流處理框架,處理響應時間一般以分鐘為單位,也就是說處理實時資料的延遲時間是秒級別的;Storm是一個實時流處理框架,處理響應是毫秒級的。所以在流框架選型方面要看具體業務場景。需要澄清的是現在很多人認為Spark Streaming流處理執行不穩定、資料丟失、事務性支援不好等等,那是因為很多人不會駕馭Spark Streaming及Spark本身。在Spark Streaming流處理的延遲時間方面,Spark定製版本,會將Spark Streaming的延遲從秒級別推進到100毫秒之內甚至更少。
SparkStreaming優點:
1、提供了豐富的API,企業中能快速實現各種複雜的業務邏輯。
2、流入Spark Streaming的資料流通過和機器學習演算法結合,完成機器模擬和圖計算。
3、Spark Streaming基於Spark優秀的血統。
SparkStreaming能不能像Storm一樣,一條一條處理資料?
Storm處理資料的方式是以條為單位來一條一條處理的,而Spark Streaming基於單位時間處理資料的,SparkStreaming能不能像Storm一樣呢?答案是:可以的。
下面是一個從kafka讀取資料,然後利用foreachRdd遍歷Rdd,在使用sparksql轉換成表進行分析的demo
package com.sprakStream.demo
import java.util.regex.Matcher
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import com.sprakStream.bean.IpMapper
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.Properties
import org.apache.spark.sql.SparkSession
import com.sprakStream.util.CommUtil
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import java.util.Arrays.ArrayList
import java.util.ArrayList
import java.util.Arrays.ArrayList
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import com.sprakStream.util.AppConstant
import org.apache.spark.rdd.RDD
import kafka.utils.Time
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ Time, Seconds, StreamingContext }
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.record.Record
import java.sql.Time

object KafkaExcamle3 {

  def main(args: Array[String]): Unit = {

    //val conf = new SparkConf()
    //val sc = new SparkContext()
    //    System.setProperty("spark.sql.warehouse.dir", "D:\\tools\\spark-2.0.0-bin-hadoop2.6");
    //    System.setProperty("hadoop.home.dir", "D:\\tools\\hadoop-2.6.0");
    println("success to Init...")
    val url = "jdbc:postgresql://172.16.12.190:5432/dataex_tmp"
    val prop = new Properties()
    prop.put("user", "postgres")
    prop.put("password", "issing")

    val conf = new SparkConf().setAppName("wordcount").setMaster("local")
    val ssc = new StreamingContext(conf, Seconds(1))
    val sparkSession = SparkSession.builder().config(conf).getOrCreate()
    val util = Utilities;
    util.setupLogging()
    // Construct a regular expression (regex) to extract fields from raw Apache log lines  
    val pattern = util.apacheLogPattern()
    // hostname:port for Kafka brokers, not Zookeeper  
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> AppConstant.KAFKA_HOST,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean))
    // List of topics you want to listen for from Kafka  
    val topics = List(AppConstant.KAFKA_TOPIC).toSet
    val lines = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)).map(_.value());

    val spiltWorks = lines.map(x => { val matcher: Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(0) })
    val spiltDesc = spiltWorks.map { x => x.toString() }.window(Seconds(30), Seconds(2))

    //呼叫foreachRDD方法,遍歷DStream中的RDD
    spiltDesc.foreachRDD({
      rdd =>
        // Get the singleton instance of SQLContext
        println()
        println("=================================================開始你的表演111111111=================================================")
        println()
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._
        val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x =>
          IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(),
            x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(),
            x(6).toString(), x(7).toString(), x(8).toString())).toDF()
        wordsDataFrame.registerTempTable("wordsDataFrame")
        val wordCountsDataFrame =
          sqlContext.sql("select * from wordsDataFrame")
        wordCountsDataFrame.show()
    })

    //呼叫foreachRDD方法,遍歷DStream中的RDD
    spiltWorks.foreachRDD({
      rdd =>
        // Get the singleton instance of SQLContext
        println()
        println("=================================================開始你的表演22222222222=================================================")
        println()
        val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
        import sqlContext.implicits._
        val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x =>
          IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(),
            x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(),
            x(6).toString(), x(7).toString(), x(8).toString())).toDF()
        wordsDataFrame.registerTempTable("wordsDataFrame")
        val wordCountsDataFrame =
          sqlContext.sql("select * from wordsDataFrame")
        wordCountsDataFrame.show()
    })

    // Kick it off  
    ssc.checkpoint("/user/root/spark/checkpoint")
    ssc.start()
    ssc.awaitTermination()
    println("KafkaExample-結束.................................")
  }

}
object SQLContextSingleton {

  @transient private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}