1. 程式人生 > >學習Spark2.0中的Structured Streaming(一)

學習Spark2.0中的Structured Streaming(一)

Spark2.0新增了Structured Streaming,它是基於SparkSQL構建的可擴充套件和容錯的流式資料處理引擎,使得實時流式資料計算可以和離線計算採用相同的處理方式(DataFrame&SQL)。Structured Streaming顧名思義,它將資料來源和計算結果都對映成一張”結構化”的表,在計算的時候以結構化的方式去操作資料流,大大方便和提高了資料開發的效率。

Spark2.0之前,流式計算通過Spark Streaming進行:

spark

使用Spark Streaming每次只能消費當前批次內的資料,當然可以通過window操作,消費過去一段時間(多個批次)內的資料。舉個簡例子,需要每隔10秒,統計當前小時的PV和UV,在資料量特別大的情況下,使用window操作並不是很好的選擇,通常是藉助其它如Redis、HBase等完成資料統計。

 

Structured Streaming將資料來源和計算結果都看做是無限大的表,資料來源中每個批次的資料,經過計算,都新增到結果表中作為行。

spark

先試試官方給的例子,在本地啟動NetCat: nc -lk 9999

在另一個會話中:
cd $SPARK_HOME/bin
./spark-shell(以local模式進入spark-shell命令列),執行下面的程式:


 
  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.SparkSession
  3. val spark = SparkSession
    .builder.appName("StructuredNetworkWordCount").getOrCreate()
  4.  
  5. import spark.implicits._
  6. val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
  7.  
  8. val words = lines.as[String].flatMap(_.split(" "))
  9. val wordCounts = words.groupBy("value"
    ).count()
  10.  
  11. val query = wordCounts.writeStream.outputMode("complete").format("console").start()
  12. query.awaitTermination()

在NetCat會話中輸入”apache spark”,spark-shell中顯示:

spark

在NetCat會話中分兩次再輸入”apache hadoop”,”lxw1234.com hadoop spark”, spark-shell中顯示:

spark

可以看到,每個Batch顯示的結果,都是完整的WordCount統計結果,這便是結算結果輸出中的完整模式(Complete Mode)。

spark

關於結算結果的輸出,有三種模式:

  1. Complete Mode:輸出最新的完整的結果表資料。
  2. Append Mode:只輸出結果表中本批次新增的資料,其實也就是本批次中的資料;
  3. Update Mode(暫不支援):只輸出結果表中被本批次修改的資料;

這些Output,可以直接通過聯結器(如MySQL JDBC、HBase API等)寫入外部儲存系統。

再看看Append模式,
注意:Append模式不支援基於資料流上的聚合操作(Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets);


 
  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.SparkSession
  3. val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
  4.  
  5. import spark.implicits._
  6. val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
  7.  
  8. val words = lines.as[String].flatMap(_.split(" "))
  9.  
  10. val query = words.writeStream.outputMode("append").format("console").start()
  11. query.awaitTermination()
  12.  

在NetCat中分三次輸入:
apache spark
apache hadoop
lxw1234.com hadoop spark

spark-shell中顯示:

spark

只有當前批次的資料。

 

如果覺得本部落格對您有幫助,請 贊助作者 。

轉載請註明:lxw的大資料田地 » 學習Spark2.0中的Structured Streaming(一)