《Spark機器學習》筆記——Spark Streaming 在實時機器學習中的應用
阿新 • • 發佈:2019-02-02
此前我們一直討論的是批量資料處理,也就是我們所有的分析、特徵提取和模型訓練都被應用於一組固定不變的資料。這很好地適用於Spark對RDD的核心抽象,即不可變的分散式資料集。儘管可以使用Spark的轉換函式和行動運算元從原始的RDD建立新RDD,但是RDD一旦建立,其中包含的資料就不會改變。
我們的注意力一直集中於批量機器學習模型,訓練模型的固定訓練集通常表示為一個特徵向量(在監督學習模型的例子中是標籤)的RDD。
在本章,我們將
介紹線上學習的概念,當新的資料出現時,模型將被訓練和更新。
學習使用Spark Streaming 做流處理
如何將Spark Streaming 應用於線上學習
10.1、線上學習
相比於離線計算,線上學習是以對訓練資料通過完全增量的形式順序處理一遍為基礎(就是說,一次只訓練一個樣例)。當處理完每一個樣本,模型會對測試樣例做預測並得到正確的輸出(例如得到分類的標籤或者回歸的真實目標)。線上學習背後的想法就是模型隨著接收到新的訊息不斷更新自己,而不是像離線訓練一次次重新訓練。
在某種配置下,當資料量很大的時候,或者生成資料的過程快速變化的時候,線上學習方法可以快速接近實時的響應,而不需要離線學習中昂貴的重新訓練。
Spark Streaming例子1
訊息生成端
一個簡單的流處理package cn.edu.shu.ces.chenjie.streamingapp import java.io.PrintWriter import java.net.ServerSocket import scala.util.Random /*** * 隨機生成“產品活動”的訊息生成端 * 每秒最多5個,然後通過網路連線傳送 */ object StreamingProducer { def main(args: Array[String]): Unit = { val random = new Random() val MaxEvents = 6;//每秒最大活動數 val nameResource = this.getClass.getResourceAsStream("names.csv") val names = scala.io.Source.fromInputStream(nameResource) .getLines() .toList .head .split(",") .toSeq println(names) val products = Seq( "iPhone Cover" -> 9.99, "Headphones" -> 5.49, "Samsung Galaxy Cover" -> 8.95, "iPad Cover" -> 7.49 ) def generateProductEvents(n: Int) = { (1 to n).map{ i => val (product, price) = products(random.nextInt(products.size)) val user = random.shuffle(names).head (user, product, price) } } val listener = new ServerSocket(9999) println("Listening on port : 9999") while (true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from : " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream, true) while (true){ Thread.sleep(1000) val num = random.nextInt(MaxEvents) val productEvents = generateProductEvents(num) productEvents.foreach{ event => out.write(event.productIterator.mkString(",")) out.write("\n") } out.flush() println(s"Created $num events") } socket.close() } }.start() } } }
package cn.edu.shu.ces.chenjie.streamingapp import org.apache.spark.streaming.{Seconds, StreamingContext} object SimpleStreamingApp { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost"流式分析, 9999) stream.print() ssc.start() ssc.awaitTermination() } }
package cn.edu.shu.ces.chenjie.streamingapp import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingAnalyticsApp { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) val events = stream.map{ record => val event = record.split(",") (event(0),event(1),event(2)) } events.foreachRDD{(rdd, time) => val numPurchases = rdd.count() val uniqueUsers = rdd.map{ case (user, _, _) => user}.distinct().count() val totalRevenue = rdd.map{ case (_, _, price) => price.toDouble}.sum() val productsByPopularity = rdd.map{ case (user, product, price) => (product, 1) }.reduceByKey(_ + _).collect().sortBy(_._2) val mostPopular = productsByPopularity(0) val formmater = new SimpleDateFormat() val dateStr = formmater.format(new Date(time.milliseconds)) println(s"== Batch start time: $dateStr ==") println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) } ssc.start() ssc.awaitTermination() } }有狀態的流計算
package cn.edu.shu.ces.chenjie.streamingapp import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingStateApp { import org.apache.spark.streaming.StreamingContext._ def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = { val currentRevenue = prices.map(_._2).sum val currentNumberPurchases = prices.size val state = currentTotal.getOrElse((0, 0.0)) Some((currentNumberPurchases + state._1, currentRevenue + state._2)) } def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10)) ssc.checkpoint("/tmp/sparkstreaming/") //對於有狀態的操作,需要設定一個檢查點 val stream = ssc.socketTextStream("localhost", 9999) val events = stream.map{ record => val event = record.split(",") (event(0),event(1),event(2).toDouble) } val users = events.map{ case (user, product, price) => (user, (product, price))} val revenuePerUser = users.updateStateByKey(updateState) revenuePerUser.print() ssc.start() ssc.awaitTermination() } }使用Spark Streaming進行線上學習
流回歸
建立流資料生成器
package cn.edu.shu.ces.chenjie.streamingapp import java.io.PrintWriter import java.net.ServerSocket import java.util.Random import breeze.linalg.DenseVector /*** * 隨機線性迴歸資料的生成器 */ object StreamingModelProducer { def main(args: Array[String]): Unit = { val MaxEvents = 10//每秒處理活動的最大數目 val NumFeatures = 100 val random = new Random() def generateRandomArray(n: Int) = Array.tabulate(n)(n => random.nextGaussian()) //生成服從正態分佈的稠密向量的函式 val w = new DenseVector(generateRandomArray(NumFeatures)) val intercept = random.nextGaussian() * 10 val listener = new ServerSocket(9999) println("Listening on port : 9999") while(true){ val socket = listener.accept() new Thread(){ override def run() = { println("Got client connected from :" + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream, true) while(true){ Thread.sleep(1000) val num = random.nextInt(MaxEvents) val data = generateNoisyData(num) data.foreach{ case (y, x) => val xStr = x.data.mkString(",") val eventStr = s"$y\t$xStr" out.write(eventStr) out.write("\n") } out.flush() println(s"Created $num events") } socket.close() } }.start() } /** * 生成一些隨機資料事件 * @param n * @return */ def generateNoisyData(n: Int) = { (1 to n).map{ i => val x = new DenseVector(generateRandomArray(NumFeatures)) val y:Double = w.dot(x) val nosiy = y + intercept (nosiy, x) } } } }建立流回歸模型
package cn.edu.shu.ces.chenjie.streamingapp import breeze.linalg._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD} import org.apache.spark.streaming.{Seconds, StreamingContext} object SimpleStreamingModel { def main(args: Array[String]): Unit = { val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10)) val stream = ssc.socketTextStream("localhost", 9999) val NumFeatures = 10 val zeroVector = DenseVector.zeros[Double](NumFeatures) val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(zeroVector.data)) .setNumIterations(1) .setStepSize(0.01) val labeledStream = stream.map{ event => val split = event.split("\t") val y = split(0).toDouble val features = split(1).split(",").map(_.toDouble) LabeledPoint(label = y, features = Vectors.dense(features)) } model.trainOn(labeledStream) model.predictOn(labeledStream) ssc.start() ssc.awaitTermination() } }