1. 程式人生 > >Spark 系列(十四)—— Spark Streaming 基本操作

Spark 系列(十四)—— Spark Streaming 基本操作

一、案例引入

這裡先引入一個基本的案例來演示流的建立:獲取指定埠上的資料並進行詞頻統計。專案依賴和程式碼實現如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.3</version>
</dependency>
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {

  def main(args: Array[String]) {

    /*指定時間間隔為 5s*/
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /*建立文字輸入流,並進行詞頻統計*/
    val lines = ssc.socketTextStream("hadoop001", 9999)
    lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()

    /*啟動服務*/
    ssc.start()
    /*等待服務結束*/
    ssc.awaitTermination()
  }
}

使用本地模式啟動 Spark 程式,然後使用 nc -lk 9999 開啟埠並輸入測試資料:

[root@hadoop001 ~]#  nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban

此時控制檯輸出如下,可以看到已經接收到資料並按行進行了詞頻統計。


下面針對示例程式碼進行講解:

3.1 StreamingContext

Spark Streaming 程式設計的入口類是 StreamingContext,在建立時候需要指明 sparkConfbatchDuration(批次時間),Spark 流處理本質是將流資料拆分為一個個批次,然後進行微批處理,batchDuration

就是批次拆分的時間間隔。這個時間可以根據業務需求和伺服器效能進行指定,如果業務要求低延遲並且伺服器效能也允許,則這個時間可以指定得很短。

這裡需要注意的是:示例程式碼使用的是本地模式,配置為 local[2],這裡不能配置為 local[1]。這是因為對於流資料的處理,Spark 必須有一個獨立的 Executor 來接收資料,然後再由其他的 Executors 來處理,所以為了保證資料能夠被處理,至少要有 2 個 Executors。這裡我們的程式只有一個數據流,在並行讀取多個數據流的時候,也需要保證有足夠的 Executors 來接收和處理資料。

3.2 資料來源

在示例程式碼中使用的是 socketTextStream

來建立基於 Socket 的資料流,實際上 Spark 還支援多種資料來源,分為以下兩類:

  • 基本資料來源:包括檔案系統、Socket 連線等;
  • 高階資料來源:包括 Kafka,Flume,Kinesis 等。

在基本資料來源中,Spark 支援監聽 HDFS 上指定目錄,當有新檔案加入時,會獲取其檔案內容作為輸入流。建立方式如下:

// 對於文字檔案,指明監聽目錄即可
streamingContext.textFileStream(dataDirectory)
// 對於其他檔案,需要指明目錄,以及鍵的型別、值的型別、和輸入格式
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

被監聽的目錄可以是具體目錄,如 hdfs://host:8040/logs/;也可以使用萬用字元,如 hdfs://host:8040/logs/2017/*

關於高階資料來源的整合單獨整理至:Spark Streaming 整合 Flume 和 Spark Streaming 整合 Kafka

3.3 服務的啟動與停止

在示例程式碼中,使用 streamingContext.start() 代表啟動服務,此時還要使用 streamingContext.awaitTermination() 使服務處於等待和可用的狀態,直到發生異常或者手動使用 streamingContext.stop() 進行終止。

二、Transformation

2.1 DStream與RDDs

DStream 是 Spark Streaming 提供的基本抽象。它表示連續的資料流。在內部,DStream 由一系列連續的 RDD 表示。所以從本質上而言,應用於 DStream 的任何操作都會轉換為底層 RDD 上的操作。例如,在示例程式碼中 flatMap 運算元的操作實際上是作用在每個 RDDs 上 (如下圖)。因為這個原因,所以 DStream 能夠支援 RDD 大部分的transformation運算元。

2.2 updateStateByKey

除了能夠支援 RDD 的運算元外,DStream 還有部分獨有的transformation運算元,這當中比較常用的是 updateStateByKey。文章開頭的詞頻統計程式,只能統計每一次輸入文字中單詞出現的數量,想要統計所有歷史輸入中單詞出現的數量,可以使用 updateStateByKey 運算元。程式碼如下:

object NetworkWordCountV2 {


  def main(args: Array[String]) {

    /*
     * 本地測試時最好指定 hadoop 使用者名稱,否則會預設使用本地電腦的使用者名稱,
     * 此時在 HDFS 上建立目錄時可能會丟擲許可權不足的異常
     */
    System.setProperty("HADOOP_USER_NAME", "root")
      
    val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    /*必須要設定檢查點*/
    ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming")
    val lines = ssc.socketTextStream("hadoop001", 9999)
    lines.flatMap(_.split(" ")).map(x => (x, 1))
      .updateStateByKey[Int](updateFunction _)   //updateStateByKey 運算元
      .print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 累計求和
    *
    * @param currentValues 當前的資料
    * @param preValues     之前的資料
    * @return 相加後的資料
    */
  def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
    val current = currentValues.sum
    val pre = preValues.getOrElse(0)
    Some(current + pre)
  }
}

使用 updateStateByKey 運算元,你必須使用 ssc.checkpoint() 設定檢查點,這樣當使用 updateStateByKey 運算元時,它會去檢查點中取出上一次儲存的資訊,並使用自定義的 updateFunction 函式將上一次的資料和本次資料進行相加,然後返回。

2.3 啟動測試

在監聽埠輸入如下測試資料:

[root@hadoop001 ~]#  nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban

此時控制檯輸出如下,所有輸入都被進行了詞頻累計:

同時在輸出日誌中還可以看到檢查點操作的相關資訊:

# 儲存檢查點資訊
19/05/27 16:21:05 INFO CheckpointWriter: Saving checkpoint for time 1558945265000 ms 
to file 'hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000'

# 刪除已經無用的檢查點資訊
19/05/27 16:21:30 INFO CheckpointWriter: 
Deleting hdfs://hadoop001:8020/spark-streaming/checkpoint-1558945265000

三、輸出操作

3.1 輸出API

Spark Streaming 支援以下輸出操作:

Output Operation Meaning
print() 在執行流應用程式的 driver 節點上列印 DStream 中每個批次的前十個元素。用於開發除錯。
saveAsTextFiles(prefix, [suffix]) 將 DStream 的內容儲存為文字檔案。每個批處理間隔的檔名基於字首和字尾生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將 DStream 的內容序列化為 Java 物件,並儲存到 SequenceFiles。每個批處理間隔的檔名基於字首和字尾生成:“prefix-TIME_IN_MS [.suffix]”。
saveAsHadoopFiles(prefix, [suffix]) 將 DStream 的內容儲存為 Hadoop 檔案。每個批處理間隔的檔名基於字首和字尾生成:“prefix-TIME_IN_MS [.suffix]”。
foreachRDD(func) 最通用的輸出方式,它將函式 func 應用於從流生成的每個 RDD。此函式應將每個 RDD 中的資料推送到外部系統,例如將 RDD 儲存到檔案,或通過網路將其寫入資料庫。

前面的四個 API 都是直接呼叫即可,下面主要講解通用的輸出方式 foreachRDD(func),通過該 API 你可以將資料儲存到任何你需要的資料來源。

3.1 foreachRDD

這裡我們使用 Redis 作為客戶端,對文章開頭示例程式進行改變,把每一次詞頻統計的結果寫入到 Redis,並利用 Redis 的 HINCRBY 命令來進行詞頻統計。這裡需要匯入 Jedis 依賴:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

具體實現程式碼如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

object NetworkWordCountToRedis {
  
    def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("NetworkWordCountToRedis").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /*建立文字輸入流,並進行詞頻統計*/
    val lines = ssc.socketTextStream("hadoop001", 9999)
    val pairs: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
     /*儲存資料到 Redis*/
    pairs.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        var jedis: Jedis = null
        try {
          jedis = JedisPoolUtil.getConnection
          partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
        } catch {
          case ex: Exception =>
            ex.printStackTrace()
        } finally {
          if (jedis != null) jedis.close()
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

其中 JedisPoolUtil 的程式碼如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisPoolUtil {

    /* 宣告為 volatile 防止指令重排序 */
    private static volatile JedisPool jedisPool = null;
    private static final String HOST = "localhost";
    private static final int PORT = 6379;

    /* 雙重檢查鎖實現懶漢式單例 */
    public static Jedis getConnection() {
        if (jedisPool == null) {
            synchronized (JedisPoolUtil.class) {
                if (jedisPool == null) {
                    JedisPoolConfig config = new JedisPoolConfig();
                    config.setMaxTotal(30);
                    config.setMaxIdle(10);
                    jedisPool = new JedisPool(config, HOST, PORT);
                }
            }
        }
        return jedisPool.getResource();
    }
}

3.3 程式碼說明

這裡將上面儲存到 Redis 的程式碼單獨抽取出來,並去除異常判斷的部分。精簡後的程式碼如下:

pairs.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val jedis = JedisPoolUtil.getConnection
    partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
    jedis.close()
  }
}

這裡可以看到一共使用了三次迴圈,分別是迴圈 RDD,迴圈分割槽,迴圈每條記錄,上面我們的程式碼是在迴圈分割槽的時候獲取連線,也就是為每一個分割槽獲取一個連線。但是這裡大家可能會有疑問:為什麼不在迴圈 RDD 的時候,為每一個 RDD 獲取一個連線,這樣所需要的連線數會更少。實際上這是不可行的,如果按照這種情況進行改寫,如下:

pairs.foreachRDD { rdd =>
    val jedis = JedisPoolUtil.getConnection
    rdd.foreachPartition { partitionOfRecords =>
        partitionOfRecords.foreach(record => jedis.hincrBy("wordCount", record._1, record._2))
    }
    jedis.close()
}

此時在執行時候就會丟擲 Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis,這是因為在實際計算時,Spark 會將對 RDD 操作分解為多個 Task,Task 執行在具體的 Worker Node 上。在執行之前,Spark 會對任務進行閉包,之後閉包被序列化併發送給每個 Executor,而 Jedis 顯然是不能被序列化的,所以會丟擲異常。

第二個需要注意的是 ConnectionPool 最好是一個靜態,惰性初始化連線池 。這是因為 Spark 的轉換操作本身就是惰性的,且沒有資料流時不會觸發寫出操作,所以出於效能考慮,連線池應該是惰性的,因此上面 JedisPool 在初始化時採用了懶漢式單例進行惰性初始化。

3.4 啟動測試

在監聽埠輸入如下測試資料:

[root@hadoop001 ~]#  nc -lk 9999
hello world hello spark hive hive hadoop
storm storm flink azkaban
hello world hello spark hive hive hadoop
storm storm flink azkaban

使用 Redis Manager 檢視寫入結果 (如下圖),可以看到與使用 updateStateByKey 運算元得到的計算結果相同。


本片文章所有原始碼見本倉庫:spark-streaming-basis

參考資料

Spark 官方文件:http://spark.apache.org/docs/latest/streaming-programming-guide.html

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南