1. 程式人生 > >sparkstreaming效能測試簡單例子--53

sparkstreaming效能測試簡單例子--53

package llf

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by root on 15-7-31.
 */
object WindwordCount {
  def main(args: Array[String]): Unit ={
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    //
    val conf = new SparkConf().setAppName("WindwordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(".")
    //
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int, b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
      .updateStateByKey[Int](updateFunc)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

終於這個月湊夠4篇部落格了``這是湊數的得意