Structured Streaming教程(2) —— 常用輸入與輸出
上篇瞭解了一些基本的Structured Streaming的概念,知道了Structured Streaming其實是一個無下界的無限遞增的DataFrame。基於這個DataFrame,我們可以做一些基本的select、map、filter操作,也可以做一些複雜的join和統計。本篇就著重介紹下,Structured Streaming支援的輸入輸出,看看都提供了哪些方便的操作。
資料來源
Structured Streaming 提供了幾種資料來源的型別,可以方便的構造Steaming的DataFrame。預設提供下面幾種型別:
File:檔案資料來源
file資料來源提供了很多種內建的格式,如csv、parquet、orc、json等等,就以csv為例:
package xingoo.sstreaming import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType object FileInputStructuredStreamingTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local") .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val userSchema = new StructType().add("name", "string").add("age", "integer") val lines = spark.readStream .option("sep", ";") .schema(userSchema) .csv("file:///Users/xingoo/IdeaProjects/spark-in-action/data/*") val query = lines.writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } }
這樣,在對應的目錄下新建檔案時,就可以在控制檯看到對應的資料了。
aaa;1
bbb;2
aaa;5
ddd;6
還有一些其他可以控制的引數:
- maxFilesPerTrigger 每個batch最多的檔案數,預設是沒有限制。比如我設定了這個值為1,那麼同時增加了5個檔案,這5個檔案會每個檔案作為一波資料,更新streaming dataframe。
- latestFirst 是否優先處理最新的檔案,預設是false。如果設定為true,那麼最近被更新的會優先處理。這種場景一般是在監聽日誌檔案的時候使用。
- fileNameOnly 是否只監聽固定名稱的檔案。
網際網路科技發展蓬勃興起,人工智慧時代來臨,抓住下一個風口。為幫助那些往想網際網路方向轉行想學習,卻因為時間不夠,資源不足而放棄的人。我自己整理的一份最新的大資料進階資料和高階開發教程,大資料學習群:868847735 歡迎進階中和進想深入大資料的小夥伴加入。
socket網路資料來源
在我們自己練習的時候,一般都是基於這個socket來做測試。首先開啟一個socket伺服器,nc -lk 9999
,然後streaming這邊連線進行處理。
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
kafka資料來源
這個是生產環境或者專案應用最多的資料來源,通常架構都是:
應用資料輸入-->kafka-->spark streaming -->其他的資料庫
由於kafka涉及的內容還比較多,因此下一篇專門介紹kafka的整合。
輸出
在配置完輸入,並針對DataFrame或者DataSet做了一些操作後,想要把結果儲存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:
- format : 配置輸出的格式
- output mode:輸出的格式
- query name:查詢的名稱,類似tempview的名字
- trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那麼不會立即執行下一個batch,而是等下一個trigger時間在執行。
- checkpoint location:為保證資料的可靠性,可以設定檢查點儲存輸出的結果。
output Mode
詳細的來看看這個輸出模式的配置,它與普通的Spark的輸出不同,只有三種類型:
- complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之後可以使用它
- append,普通的dataframe在做完map或者filter之後可以使用。這種模式會把新的batch的資料輸出出來,
- update,把此次新增的資料輸出,並更新整個dataframe。有點類似之前的streaming的state處理。
輸出的型別
Structed Streaming提供了幾種輸出的型別:
- file,儲存成csv或者parquet
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
- console,直接輸出到控制檯。一般做測試的時候用這個比較方便。
noAggDF
.writeStream
.format("console")
.start()
- memory,可以儲存在內容,供後面的程式碼使用
aggDF
.writeStream
.queryName("aggregates")
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
- foreach,引數是一個foreach的方法,使用者可以實現這個方法實現一些自定義的功能。
writeStream
.foreach(...)
.start()
這個foreach的功能很強大的。