1. 程式人生 > >《Spark機器學習》筆記——Spark Streaming 在實時機器學習中的應用

《Spark機器學習》筆記——Spark Streaming 在實時機器學習中的應用

此前我們一直討論的是批量資料處理,也就是我們所有的分析、特徵提取和模型訓練都被應用於一組固定不變的資料。這很好地適用於Spark對RDD的核心抽象,即不可變的分散式資料集。儘管可以使用Spark的轉換函式和行動運算元從原始的RDD建立新RDD,但是RDD一旦建立,其中包含的資料就不會改變。

我們的注意力一直集中於批量機器學習模型,訓練模型的固定訓練集通常表示為一個特徵向量(在監督學習模型的例子中是標籤)的RDD。

在本章,我們將

介紹線上學習的概念,當新的資料出現時,模型將被訓練和更新。

學習使用Spark Streaming 做流處理

如何將Spark Streaming 應用於線上學習

10.1、線上學習

相比於離線計算,線上學習是以對訓練資料通過完全增量的形式順序處理一遍為基礎(就是說,一次只訓練一個樣例)。當處理完每一個樣本,模型會對測試樣例做預測並得到正確的輸出(例如得到分類的標籤或者回歸的真實目標)。線上學習背後的想法就是模型隨著接收到新的訊息不斷更新自己,而不是像離線訓練一次次重新訓練。

在某種配置下,當資料量很大的時候,或者生成資料的過程快速變化的時候,線上學習方法可以快速接近實時的響應,而不需要離線學習中昂貴的重新訓練。

Spark Streaming例子1

訊息生成端

package cn.edu.shu.ces.chenjie.streamingapp

import java.io.PrintWriter
import java.net.ServerSocket

import scala.util.Random

/***
  * 隨機生成“產品活動”的訊息生成端
  * 每秒最多5個,然後通過網路連線傳送
  */
object StreamingProducer {
  def main(args: Array[String]): Unit = {
    val random  = new Random()

    val MaxEvents = 6;//每秒最大活動數

    val nameResource = this.getClass.getResourceAsStream("names.csv")
    val names = scala.io.Source.fromInputStream(nameResource)
      .getLines()
      .toList
      .head
      .split(",")
      .toSeq
    println(names)

    val products = Seq(
      "iPhone Cover" -> 9.99,
      "Headphones" -> 5.49,
      "Samsung Galaxy Cover" -> 8.95,
      "iPad Cover" -> 7.49
    )

    def generateProductEvents(n: Int) = {
      (1 to n).map{ i =>
        val (product, price) = products(random.nextInt(products.size))
        val user = random.shuffle(names).head
        (user, product, price)
      }
    }

    val listener = new ServerSocket(9999)
    println("Listening on port : 9999")
    while (true){
      val socket = listener.accept()
      new Thread(){
        override def run() = {
          println("Got client connected from : " + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream, true)
          while (true){
            Thread.sleep(1000)
            val num = random.nextInt(MaxEvents)
            val productEvents = generateProductEvents(num)
            productEvents.foreach{  event =>
              out.write(event.productIterator.mkString(","))
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events")
          }
          socket.close()
        }
      }.start()
    }
  }


}
一個簡單的流處理
package cn.edu.shu.ces.chenjie.streamingapp

import org.apache.spark.streaming.{Seconds, StreamingContext}

object SimpleStreamingApp {
  def main(args: Array[String]): Unit = {
    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))
    val stream = ssc.socketTextStream("localhost"
, 9999) stream.print() ssc.start() ssc.awaitTermination() } }
流式分析
package cn.edu.shu.ces.chenjie.streamingapp

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingAnalyticsApp {
  def main(args: Array[String]): Unit = {
    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))
    val stream = ssc.socketTextStream("localhost", 9999)
    val events = stream.map{  record =>
      val event = record.split(",")
      (event(0),event(1),event(2))
    }
    events.foreachRDD{(rdd, time) =>
      val numPurchases = rdd.count()
      val uniqueUsers = rdd.map{  case (user, _, _) => user}.distinct().count()
      val totalRevenue = rdd.map{ case (_, _, price) => price.toDouble}.sum()
      val productsByPopularity = rdd.map{ case (user, product, price) =>
        (product, 1)
      }.reduceByKey(_ + _).collect().sortBy(_._2)
      val mostPopular = productsByPopularity(0)

      val formmater = new SimpleDateFormat()
      val dateStr = formmater.format(new Date(time.milliseconds))
      println(s"== Batch start time: $dateStr ==")
      println("Total purchases: " + numPurchases)
      println("Unique users: " + uniqueUsers)
      println("Total revenue: " + totalRevenue)
      println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2))
    }
    ssc.start()
    ssc.awaitTermination()
  }
}
有狀態的流計算
package cn.edu.shu.ces.chenjie.streamingapp

import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingStateApp {
  import  org.apache.spark.streaming.StreamingContext._
  def updateState(prices: Seq[(String, Double)], currentTotal: Option[(Int, Double)]) = {
    val currentRevenue = prices.map(_._2).sum
    val currentNumberPurchases = prices.size
    val state = currentTotal.getOrElse((0, 0.0))
    Some((currentNumberPurchases + state._1, currentRevenue + state._2))
  }

  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[2]","First Streaming App", Seconds(10))

    ssc.checkpoint("/tmp/sparkstreaming/")
    //對於有狀態的操作,需要設定一個檢查點
val stream = ssc.socketTextStream("localhost", 9999)
    val events = stream.map{  record =>
      val event = record.split(",")
      (event(0),event(1),event(2).toDouble)
    }
    val users = events.map{ case (user, product, price) => (user, (product, price))}
    val revenuePerUser = users.updateStateByKey(updateState)
    revenuePerUser.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
使用Spark Streaming進行線上學習

流回歸

建立流資料生成器

package cn.edu.shu.ces.chenjie.streamingapp

import java.io.PrintWriter
import java.net.ServerSocket
import java.util.Random

import breeze.linalg.DenseVector

/***
  * 隨機線性迴歸資料的生成器
*/
object StreamingModelProducer {
  def main(args: Array[String]): Unit = {
    val MaxEvents = 10//每秒處理活動的最大數目
val NumFeatures = 100
val random = new Random()

    def generateRandomArray(n: Int) = Array.tabulate(n)(n => random.nextGaussian())
    //生成服從正態分佈的稠密向量的函式
val w = new DenseVector(generateRandomArray(NumFeatures))
    val intercept = random.nextGaussian() * 10
val listener = new ServerSocket(9999)
    println("Listening on port : 9999")

    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run() = {
          println("Got client connected from :" + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream, true)

          while(true){
            Thread.sleep(1000)
            val num = random.nextInt(MaxEvents)
            val data = generateNoisyData(num)
            data.foreach{ case (y, x) =>
              val xStr = x.data.mkString(",")
              val eventStr = s"$y\t$xStr"
out.write(eventStr)
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events")
          }
          socket.close()
        }
      }.start()

    }

    /**
      * 生成一些隨機資料事件
* @param n
* @return
*/
def generateNoisyData(n: Int) = {
      (1 to n).map{ i =>
        val x = new DenseVector(generateRandomArray(NumFeatures))
        val y:Double = w.dot(x)
        val nosiy = y + intercept
        (nosiy, x)
      }
    }


  }
}
建立流回歸模型
package cn.edu.shu.ces.chenjie.streamingapp

import breeze.linalg._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SimpleStreamingModel {
  def main(args: Array[String]): Unit = {
    val ssc = new StreamingContext("local[2]", "First Streaming App", Seconds(10))
    val stream = ssc.socketTextStream("localhost", 9999)
    val NumFeatures = 10
val zeroVector = DenseVector.zeros[Double](NumFeatures)
    val model = new StreamingLinearRegressionWithSGD()
      .setInitialWeights(Vectors.dense(zeroVector.data))
      .setNumIterations(1)
      .setStepSize(0.01)
    val labeledStream = stream.map{ event =>
      val split = event.split("\t")
      val y = split(0).toDouble
      val features = split(1).split(",").map(_.toDouble)
      LabeledPoint(label = y, features = Vectors.dense(features))
    }

    model.trainOn(labeledStream)
    model.predictOn(labeledStream)
    ssc.start()
    ssc.awaitTermination()
  }
}