spark例子整理
阿新 • • 發佈:2018-12-25
Spark Streaming是一個準實時流處理框架,處理響應時間一般以分鐘為單位,也就是說處理實時資料的延遲時間是秒級別的;Storm是一個實時流處理框架,處理響應是毫秒級的。所以在流框架選型方面要看具體業務場景。需要澄清的是現在很多人認為Spark Streaming流處理執行不穩定、資料丟失、事務性支援不好等等,那是因為很多人不會駕馭Spark Streaming及Spark本身。在Spark Streaming流處理的延遲時間方面,Spark定製版本,會將Spark Streaming的延遲從秒級別推進到100毫秒之內甚至更少。
SparkStreaming優點:
1、提供了豐富的API,企業中能快速實現各種複雜的業務邏輯。
2、流入Spark Streaming的資料流通過和機器學習演算法結合,完成機器模擬和圖計算。
3、Spark Streaming基於Spark優秀的血統。
SparkStreaming能不能像Storm一樣,一條一條處理資料?
Storm處理資料的方式是以條為單位來一條一條處理的,而Spark Streaming基於單位時間處理資料的,SparkStreaming能不能像Storm一樣呢?答案是:可以的。
下面是一個從kafka讀取資料,然後利用foreachRdd遍歷Rdd,在使用sparksql轉換成表進行分析的demo
package com.sprakStream.demo import java.util.regex.Matcher import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import com.sprakStream.bean.IpMapper import org.apache.spark.SparkConf import org.apache.spark.SparkContext import java.util.Properties import org.apache.spark.sql.SparkSession import com.sprakStream.util.CommUtil import java.sql.Connection import java.sql.PreparedStatement import java.sql.DriverManager import java.util.Arrays.ArrayList import java.util.ArrayList import java.util.Arrays.ArrayList import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructType import com.sprakStream.util.AppConstant import org.apache.spark.rdd.RDD import kafka.utils.Time import org.apache.spark.sql.SQLContext import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{ Time, Seconds, StreamingContext } import org.apache.spark.util.IntParam import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.hadoop.record.Record import java.sql.Time object KafkaExcamle3 { def main(args: Array[String]): Unit = { //val conf = new SparkConf() //val sc = new SparkContext() // System.setProperty("spark.sql.warehouse.dir", "D:\\tools\\spark-2.0.0-bin-hadoop2.6"); // System.setProperty("hadoop.home.dir", "D:\\tools\\hadoop-2.6.0"); println("success to Init...") val url = "jdbc:postgresql://172.16.12.190:5432/dataex_tmp" val prop = new Properties() prop.put("user", "postgres") prop.put("password", "issing") val conf = new SparkConf().setAppName("wordcount").setMaster("local") val ssc = new StreamingContext(conf, Seconds(1)) val sparkSession = SparkSession.builder().config(conf).getOrCreate() val util = Utilities; util.setupLogging() // Construct a regular expression (regex) to extract fields from raw Apache log lines val pattern = util.apacheLogPattern() // hostname:port for Kafka brokers, not Zookeeper val kafkaParams = Map[String, Object]( "bootstrap.servers" -> AppConstant.KAFKA_HOST, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "example", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) // List of topics you want to listen for from Kafka val topics = List(AppConstant.KAFKA_TOPIC).toSet val lines = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)).map(_.value()); val spiltWorks = lines.map(x => { val matcher: Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(0) }) val spiltDesc = spiltWorks.map { x => x.toString() }.window(Seconds(30), Seconds(2)) //呼叫foreachRDD方法,遍歷DStream中的RDD spiltDesc.foreachRDD({ rdd => // Get the singleton instance of SQLContext println() println("=================================================開始你的表演111111111=================================================") println() val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x => IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(), x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(), x(6).toString(), x(7).toString(), x(8).toString())).toDF() wordsDataFrame.registerTempTable("wordsDataFrame") val wordCountsDataFrame = sqlContext.sql("select * from wordsDataFrame") wordCountsDataFrame.show() }) //呼叫foreachRDD方法,遍歷DStream中的RDD spiltWorks.foreachRDD({ rdd => // Get the singleton instance of SQLContext println() println("=================================================開始你的表演22222222222=================================================") println() val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map(x => x.toString().split(" ")).map(x => IpMapper(CommUtil.uuid(), x(0).toString(), x(1).toString(), x(2).toString(), x(3).toString(), x(4).toString(), x(5).toString(), x(6).toString(), x(7).toString(), x(8).toString())).toDF() wordsDataFrame.registerTempTable("wordsDataFrame") val wordCountsDataFrame = sqlContext.sql("select * from wordsDataFrame") wordCountsDataFrame.show() }) // Kick it off ssc.checkpoint("/user/root/spark/checkpoint") ssc.start() ssc.awaitTermination() println("KafkaExample-結束.................................") } } object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }