1. 程式人生 > >Spark Streaming中的基本操作函式例項

Spark Streaming中的基本操作函式例項


該文例項我的碼雲直達車

請了解一些基本資訊:
DStream是Spark Streaming提供的基本抽象。它表示連續的資料流,可以是從源接收的輸入資料流,也可以是通過轉換輸入流生成的已處理資料流。在內部,DStream由一系列連續的RDD表示,這是Spark對不可變分散式資料集的抽象。DStream中的每個RDD都包含來自特定時間間隔的資料,如下圖所示
在這裡插入圖片描述

Transformations

直達車

1)map(func),將func函式作用到每一個元素上並生成一個新的元素,得到一個新的的DStream物件,包含這些新的元素。

程式碼

object Map {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val mapLines = lines.map(word => "map_" + word)

    mapLines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述

2)flatMap(func),將func函式作用到每一個元素上並生成0個或多個新的元素(例如下面的split就生成了>=0個新元素),得到一個新的DStream物件。包含這些新的元素。

程式碼

object FlatMap {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val fmapLines = lines.flatMap(_.split(" "))

    fmapLines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
ps:這裡放一個關於RDD map和flatMap的依賴圖(紅色塊表示一個RDD區,黑色塊表示該分割槽集合),意會下
在這裡插入圖片描述
在這裡插入圖片描述
3)filter(func),對DStream每一個元素,應用func方法進行計算,如果func函式返回結果為true,則保留該元素,否則丟棄該元素,返回一個新的DStream。

程式碼

object Filter {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val filterLines = lines.flatMap(_.split(" "))
      .filter(!StringUtils.equals(_, "hello"))

    filterLines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述

4)repartition(numPartitions),重新設定分割槽,可自行操作。

5)union(otherStream),返回一個新的DStream,它包含源DStream和otherDStream中元素的並集。

程式碼

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val union1 = lines.map(word => "union1_" + word)
    val union2 = lines.map(word => "union2_" + word)
    val union1_2 = union1.union(union2)

    union1.print()
    union2.print()
    union1_2.print()

    ssc.start()
    ssc.awaitTermination()
  }

結果
在這裡插入圖片描述
6)count(),通過計算源DStream的每個RDD中的元素數量,返回單個元素RDD的新DStream。

程式碼

object Count {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val mapLines = lines.map(_.split(" "))
    val fmapLines = lines.flatMap(_.split(" "))

    mapLines.count().print()
    fmapLines.count().print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
7)reduce(func),通過使用函式func(它接受兩個引數並返回一個),其中兩個引數(元素)兩兩計算,返回單個元素RDD的新DStream 。

程式碼

object Reduce {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val result = fmapLines.reduce(_ + "*" + _)
    //fmapLines.reduce((a, b) => a + "*" + b)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
8)countByValue(),當在型別為K的DStream元素上呼叫時,返回新DStream的元素是(K,Long)對,其中每個鍵的值(Long)是其在源DStream的每個RDD中的頻率。

程式碼

object countByValue {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val countByKey = fmapLines.countByValue()

    countByKey.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
9)reduceByKey(func, [numTasks]),當在型別為(K,V)的DStream元素上呼叫時,返回(K,V)對的新DStream,其中K為原來的K,V是由K經過傳入func計算得到的。

注意:預設情況下,這使用Spark的預設並行任務數(local模式下預設為2,在群集模式下,數量由config屬性確定spark.default.parallelism)進行分組。

程式碼

object ReduceByKey {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val tuple = fmapLines.map(word => (word, 1))
    val reduceByKey = tuple.reduceByKey(_ + _)

    reduceByKey.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述

10)join(otherStream, [numTasks]),當在(K,V)和(K,W)對的兩個DStream上呼叫時,返回新的DStream內容是(K,(V,W))對。numTasks並行度,可選

程式碼

object Join {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val words = lines.flatMap(_.split(" "))
    val join1 = words.map(word => (word, "join1_" + word))
    val join2 = words.map(word => (word, "join2_" + word))
    val join1_2 = join1.join(join2)

    join1.print()
    join2.print()
    join1_2.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述

11)cogroup(otherStream, [numTasks]),當在(K,V)和(K,W)對的DStream上呼叫時,返回(K,Seq [V],Seq [W])元組的新DStream。numTasks並行度,可選

程式碼

object Cogroup {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)
    val words = lines.flatMap(_.split(" "))
    val cogroup1 = words.map(word => (word, "cogroup1_" + word))
    val cogroup2 = words.map(word => (word, "cogroup2_" + word))
    val cogroup1_2 = cogroup1.cogroup(cogroup2)

    cogroup1.print()
    cogroup2.print()
    cogroup1_2.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
12)transform(func) 直達車,通過將RDD-to-RDD函式應用於源DStream的每個RDD來返回新的DStream。這可以用於在DStream上執行任意RDD操作。

程式碼

object Transform {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val words = lines.transform(rdd=>{
      rdd.flatMap(_.split(" "))
    })

    words.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
13)updateStateByKey(func)直達車,返回一個新的“狀態”DStream,其中通過在鍵的先前狀態和鍵的新值上應用給定函式來更新每個鍵的狀態。這可用於維護每個金鑰的任意狀態資料。

程式碼

object UpdateStateByKey {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    //請注意,使用updateStateByKey需要配置檢查點目錄
    ssc.checkpoint("D:\\spark\\checkpoint")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 更新資料
    * @param newValues
    * @param runningCount
    * @return
    */
  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

    val current = newValues.sum
    val pre = runningCount.getOrElse(0)
    Some(current + pre)

  }
}

結果
在這裡插入圖片描述
在這裡插入圖片描述

Window Operations

直達車

window:定時的進行一段時間內資料的操作
window length:視窗的長度
sliding interval:視窗的間隔
這兩個引數和batch size是倍數關係,不是的話會報錯

1)window(windowLength, slideInterval),將當前時刻當前長度視窗中的元素取出形成一個新的DStream。

程式碼

object Window {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    //每隔5秒去計算前10秒的結果
    val window = fmapLines.window(Seconds(10), Seconds(5))

    window.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
2)countByWindow(windowLength, slideInterval),和count類似,只不過Dstream是我們擷取的。

程式碼

object CountByWindow {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("D:\\spark\\checkpoint")
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)


    val fmapLines = lines.flatMap(_.split(" "))
    //每隔5s統計當前10秒長度的時間視窗的DStream中元素的個數:
    val countByWindow = fmapLines.countByWindow(Seconds(10), Seconds(5))

    countByWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
3)reduceByWindow(func, windowLength, slideInterval),和reduce類似,只不過Dstream是我們擷取的。

程式碼

object ReduceByWindow {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("D:\\spark\\checkpoint")
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)


    val fmapLines = lines.flatMap(_.split(" "))
    val reduceByWindow = fmapLines.reduceByWindow(_ + "*" + _, Seconds(10), Seconds(5))

    reduceByWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 直達車,和reduceByKey類似,只不過Dstream是我們擷取的。

程式碼

object ReduceByKeyAndWindow {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("D:\\spark\\checkpoint")
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val tuple = fmapLines.map(word => (word, 1))
    val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(10),Seconds(5))

    reduceByKeyAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]),和上面相比,多傳入一個函式invFunc。向車站一樣,有進去的人,也有出去的人,進去的人+1,出來的人-1。

程式碼

object ReduceByKeyAndWindow2 {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("D:\\spark\\checkpoint")
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val tuple = fmapLines.map(word => (word, 1))
    val reduceByKeyAndWindow = tuple.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), (a: Int, b: Int) => (a - b), Seconds(10), Seconds(5))

    reduceByKeyAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述
6)countByValueAndWindow(windowLength,slideInterval, [numTasks]),和countByValue類似,只不過Dstream是我們擷取的。

程式碼

object CountByValueAndWindow {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("D:\\spark\\checkpoint")
    ssc.sparkContext.setLogLevel("ERROR")

    val lines = ssc.socketTextStream("192.168.31.30", 9999)

    val fmapLines = lines.flatMap(_.split(" "))
    val countByValueAndWindow = fmapLines.countByValueAndWindow(Seconds(10), Seconds(5))

    countByValueAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

結果
在這裡插入圖片描述

Join Operations

直達車

1)Stream-stream joins 直達車

呼叫 join,leftOuterJoin,rightOuterJoin,fullOuterJoin就ok了

2)Stream-dataset joins 直達車

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

我們呼叫transform後就可以和dataset 連線操作了

Output Operations

直達車

輸出操作 含義
print() 在執行流應用程式的驅動程式節點上列印DStream中每批資料的前十個元素。這對開發和除錯很有用。
saveAsTextFiles(prefix, [suffix]) 將此DStream的內容儲存為文字檔案。每個批處理間隔的檔名基於字首和字尾生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將此DStream的內容儲存為SequenceFiles序列化Java物件。每個批處理間隔的檔名基於字首和 字尾生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsHadoopFiles(prefix, [suffix]) 將此DStream的內容儲存為SequenceFiles序列化Java物件。每個批處理間隔的檔名基於字首和 字尾生成:“prefix-TIME_IN_MS [.suffix]”。 Python API這在Python API中不可用
foreachRDD(func) 最通用的輸出運算子,它將函式func應用於從流生成的每個RDD。此函式應將每個RDD中的資料推送到外部系統,例如將RDD儲存到檔案,或通過網路將其寫入資料庫。請注意,函式func在執行流應用程式的驅動程式程序中執行,並且通常會在其中執行RDD操作,這將強制計算流式RDD。

1)foreachRDD(func),正確高效的使用 直達車

connection 為外部連結

程式碼

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

更高效的

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

相關推薦

Spark Streaming基本操作函式例項

該文例項我的碼雲直達車 請了解一些基本資訊: DStream是Spark Streaming提供的基本抽象。它表示連續的資料流,可以是從源接收的輸入資料流,也可以是通過轉換輸入流生成的已處理資料流。在內部,DStream由一系列連續的RDD表示,這是Spar

Spark Streaming操作函數講解

csdn 後綴 rep 包含著 所有 並行計算 技術分享 ref filter Spark Streaming中的操作函數講解 根據根據Spark官方文檔中的描述,在Spark Streaming應用中,一個DStream對象可以調用多種操作,主要分為以下幾類 Tra

Spark StreamingreduceByKeyAndWindow例項開發

package SparkStreamingTest.Scala import org.apache.log4j.{Level, Logger} import org.apache.spark.Spa

棧的基本操作(+例項

棧:是限定僅在表尾進行插入或刪除操作的線性表,表尾段稱為棧頂,表頭段稱為棧底,棧有稱後進先出線性表。棧有順序棧和鏈棧。 1、順序棧的結構定義 //順序棧的儲存結構 typedef struct{ ElemType *base;//棧底指標變數 ElemType *top;//棧頂指

spark streamingWordCount

通過一些簡單的案例,可以知道一些大致的用法 1.對每一個批次的資料進行操作: import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streami

JavaScript立即執行函式例項詳解 轉載 作者:李牧羊

javascript和其他程式語言相比比較隨意,所以javascript程式碼中充滿各種奇葩的寫法,有時霧裡看花,當然,能理解各型各色的寫法也是對javascript語言特性更進一步的深入理解。這篇文章主要給大家介紹了關於JavaScript中立即執行函式的相關資料,需要的朋友可以參考下。 前言

Makefile字串操作函式

makefile函式呼叫的形式: $(function args) function:函式名 args: 函式引數。引數和函式名之間用空格或tab鍵隔開,多個引數之間用逗號隔開。 1、subst – 特定字串替換 $(subst from,to,text) 在文字"t

順序表基本操作的實現

//庫函式標頭檔案包含 #include<stdio.h> #include<malloc.h> #include<stdlib.h> //函式狀態碼定義 #def

資料結構-迴圈佇列的基本操作函式實現(含全部程式碼)

    主要包含以下函式:    InitQueue(SqQueue &Q)              引數:迴圈佇列Q 功能:初始化迴圈佇列Q 時間複雜度:O(1)     QueueEmpty(SqQueue Q)              引數:迴圈佇列Q

資料結構-鏈隊的基本操作函式的實現(含全部程式碼)

主要包含以下函式:        InitQueue(LinkQueue &Q) 引數:鏈隊Q 功能:初始化  時間複雜度O(1)     EnQueue(LinkQueue &Q,QElemType e) 引數:鏈隊Q,元素e 功能:將e入隊 時間複雜度

VS 2017 檔案基本操作函式

Code #include <iostream> #include <cstdlib> #include <process.h> FILE *stream, *stream1, *stream2; #pragma warning(disable:49

python學習基本操作

1.檔案移動與複製 # encoding:utf-8 import os import shutil shutil.move(os.path.join(path1,pic), os.path.join(dst_path, pic)) shutil.copy(pic1, os.path.j

Spark Streaming基本工作原理

一、 Spark Streaming簡介 Spark Streaming是Spark Core API的一種擴充套件,它可以用於進行大規模、高吞吐量、容錯的實時資料流的處理。它支援從很多種資料來源中讀取資料,比如Kafka、Flume、Twitter、ZeroM

spark streaming direct 直連方式從kafka怎麼拉取資料

我們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 資料時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取資料的,而 fromOffsets 一般都是需要我們自己管理的,而每批次的 untilOffse

MATLAB數字影象處理基本操作函式

影象讀取:I = imread('mao.jpg'); 影象顯示:imshow(I) 彩色轉灰度:I = rgb2gray(I); 影象縮放:X1 = imresize(I,2);%放大兩倍 影象旋轉:B = imrotate(A,angle); 插值: 1、最

Spark-RDD-02基本操作詳解

Spark中RDD是一個不可變的分散式物件集合,每個RDD都被分為多個分割槽,這些分割槽被分發到叢集的不同的節點中進行計算。 SparkContext是Spark的程式設計主入口點,可以被用於在叢集中建立RDDs,在spark-shell中被系統預設建立為sc。 兩種建立

spark streamingreduceByKeyAndWindow簡單例子

視窗的一些簡單操作 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingCon

spark streamingtransform過濾廣告黑名單

/* transform操作,應用在DStream上時,可以用於執行任意的RDD到RDD的轉換操作。它可以用於實現,DStream API中 所沒有提供的操作。比如說,DStream API中,並沒有提供將一個DStream中的每個batch,與一個特定的RDD進行joi

Spark Streaming基本思路和基本架構

一、基於 Spark 做 Spark Streaming 的思路 Spark Streaming 與 Spark Core 的關係可以用下面的經典部件圖來表述: 在本節,我們先探討一下基於 Spark Core 的 RDD API,如何對 streaming

Spark Streaming 如何實現 Exactly-Once 語義

Exactly-once 語義是實時計算的難點之一。要做到每一條記錄只會被處理一次,即使伺服器或網路發生故障時也能保證沒有遺漏,這不僅需要實時計算框架本身的支援,還對上游的訊息系統、下游的資料儲存有所要求。此外,我們在編寫計算流程時也需要遵循一定規範,才能真正實