1. 程式人生 > >Structured Streaming教程(2) —— 常用輸入與輸出

Structured Streaming教程(2) —— 常用輸入與輸出

上篇瞭解了一些基本的Structured Streaming的概念,知道了Structured Streaming其實是一個無下界的無限遞增的DataFrame。基於這個DataFrame,我們可以做一些基本的select、map、filter操作,也可以做一些複雜的join和統計。本篇就著重介紹下,Structured Streaming支援的輸入輸出,看看都提供了哪些方便的操作。

資料來源

Structured Streaming 提供了幾種資料來源的型別,可以方便的構造Steaming的DataFrame。預設提供下面幾種型別:

File:檔案資料來源

file資料來源提供了很多種內建的格式,如csv、parquet、orc、json等等,就以csv為例:

package xingoo.sstreaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

object FileInputStructuredStreamingTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._
    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val lines = spark.readStream
      .option("sep", ";")
      .schema(userSchema)
      .csv("file:///Users/xingoo/IdeaProjects/spark-in-action/data/*")

    val query = lines.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

這樣,在對應的目錄下新建檔案時,就可以在控制檯看到對應的資料了。

aaa;1
bbb;2
aaa;5
ddd;6

還有一些其他可以控制的引數:

  • maxFilesPerTrigger 每個batch最多的檔案數,預設是沒有限制。比如我設定了這個值為1,那麼同時增加了5個檔案,這5個檔案會每個檔案作為一波資料,更新streaming dataframe。
  • latestFirst 是否優先處理最新的檔案,預設是false。如果設定為true,那麼最近被更新的會優先處理。這種場景一般是在監聽日誌檔案的時候使用。
  • fileNameOnly 是否只監聽固定名稱的檔案。

網際網路科技發展蓬勃興起,人工智慧時代來臨,抓住下一個風口。為幫助那些往想網際網路方向轉行想學習,卻因為時間不夠,資源不足而放棄的人。我自己整理的一份最新的大資料進階資料和高階開發教程,大資料學習群:868847735   歡迎進階中和進想深入大資料的小夥伴加入。


 

socket網路資料來源

在我們自己練習的時候,一般都是基於這個socket來做測試。首先開啟一個socket伺服器,nc -lk 9999,然後streaming這邊連線進行處理。

  spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

kafka資料來源

這個是生產環境或者專案應用最多的資料來源,通常架構都是:

應用資料輸入-->kafka-->spark streaming -->其他的資料庫

由於kafka涉及的內容還比較多,因此下一篇專門介紹kafka的整合。

輸出

在配置完輸入,並針對DataFrame或者DataSet做了一些操作後,想要把結果儲存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:

  • format : 配置輸出的格式
  • output mode:輸出的格式
  • query name:查詢的名稱,類似tempview的名字
  • trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那麼不會立即執行下一個batch,而是等下一個trigger時間在執行。
  • checkpoint location:為保證資料的可靠性,可以設定檢查點儲存輸出的結果。

output Mode

詳細的來看看這個輸出模式的配置,它與普通的Spark的輸出不同,只有三種類型:

  • complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之後可以使用它
  • append,普通的dataframe在做完map或者filter之後可以使用。這種模式會把新的batch的資料輸出出來,
  • update,把此次新增的資料輸出,並更新整個dataframe。有點類似之前的streaming的state處理。

輸出的型別

Structed Streaming提供了幾種輸出的型別:

  • file,儲存成csv或者parquet
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()
  • console,直接輸出到控制檯。一般做測試的時候用這個比較方便。
noAggDF
  .writeStream
  .format("console")
  .start()
  • memory,可以儲存在內容,供後面的程式碼使用
aggDF
  .writeStream
  .queryName("aggregates")
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()  
  • foreach,引數是一個foreach的方法,使用者可以實現這個方法實現一些自定義的功能。
writeStream
    .foreach(...)
    .start()

這個foreach的功能很強大的。