SparkStreaming 搭建《二》執行SparkStreaming在叢集上提交方式
阿新 • • 發佈:2018-12-31
本教程主要總結SparkStreaming並打包在叢集上提交的方式。
需要先開啟
$ nc -lk 9999
程式碼:
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamingDemo { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } // SparkStreamingDemo.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() }
後面的localhost 9999是輸入引數 // spark-submit --queue media --class test.SparkStreamingDemo --master yarn-cluster --executor-memory 2g --executor-cores 2 --conf spark.yarn.submit.waitAppCompletion=false ~/home/SparkStreamingDemo-1.0-SNAPSHOT.jar localhost 9999