sparkstreaming效能測試簡單例子--53
阿新 • • 發佈:2019-02-18
package llf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by root on 15-7-31. */ object WindwordCount { def main(args: Array[String]): Unit ={ val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } // val conf = new SparkConf().setAppName("WindwordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) ssc.checkpoint(".") // val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(",")) val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int, b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt)) .updateStateByKey[Int](updateFunc) wordCounts.print() ssc.start() ssc.awaitTermination() } }
終於這個月湊夠4篇部落格了``這是湊數的