1. 程式人生 > 遊戲資訊 >【自翻】《普羅斯佩羅的灰燼(Ashes of Prospero)攻略》(第四章)

【自翻】《普羅斯佩羅的灰燼(Ashes of Prospero)攻略》(第四章)

四、掌握spark streaming的工作原理、離散化流、實時資料獲取(套接字和資料夾)等內容,掌握Dstream的各種轉換(具體到程式碼的編寫使用);

1、流資料

(1)流資料的概念

流資料是一組順序、大量、快速、連續到達的資料序列,一般情況下,資料流可被視為一個隨時間延續而無限增長的動態資料集合。

(2)流資料的特點

實時到達、次序獨立、規模巨集大、不易提取

2、Spark Streaming介紹

Spark Streaming 使得構建可擴充套件的容錯流應用變得容易。它是Spark核心API的一個擴充套件,能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景

通過SparkStreaming處理後的結果還可以儲存在資料庫中,分散式檔案系統HDFS中,還可以實時展現在資料大螢幕中等。

SparkStreaming能和機器學習庫(MLlib)以及圖計算庫(Graphx)進行無縫銜接實現實時線上分析,以及使用DataFrame和SQL進行操作

SparkStreaming特點:

具有易於使用,高容錯性,高吞吐量等特點使其能夠勝任實時的流計算.

3、SparkStreaming工作原理——分批次進行處理

Spark Streaming接收實時輸入資料流並將資料分成批處理,然後由Spark引擎處理以批量生成最終結果流。

先接收實時輸入的資料流,然後將資料拆分成多個batch(批),比如每收集1秒的資料封裝為一個batch,然後將每個batch交給Spark的計算引擎進行處理,最後會生產出一個結果資料流,結果中的資料也是由一個一個的batch所組成的。

4、離散流

離散流(discretized stream)簡稱“Dstream”,這是Spark Streaming對內部持續的實時資料流的抽象描述,也就是我們處理的一個實時資料流,在Spark Streaming中對應於一個DStream 例項。

(1)離散化流(Discretized Stream)

Discretized Stream或DStream是Spark Streaming提供的基本抽象。

它表示連續的資料流,可以是從源接收的輸入資料流,也可以是通過轉換輸入流生成的已處理資料流。 在內部,DStream由一系列連續的RDD表示,這是Spark對不可變分散式資料集的抽象。

DStream中的每個RDD都包含來自特定時間間隔的資料。

DStram很好理解,就是按時間間隔組合的一系列RDD,其特點就是離散且持續。

(2)使用套接字socket獲取DStream

使用ssc物件的socketTextStream方法獲取DStream

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master="local[4]") # 指定以4個執行緒的本地模式執行Spark
ssc = StreamingContext(sc, 5) # 例項化一個StreamingContext物件,生成流資料的時候是每5秒一個批次
lines = ssc.socketTextStream("10.2.87.2", 9999) # 從指定的ip和埠號獲取資料
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.pprint() # 統計各個時間段內每個單詞出現的次數
ssc.start() # 啟動流資料的執行
ssc.awaitTermination() # 等待執行停止

(3)使用檔案系統中的檔案獲取DStream

ssc物件的textFileStream方法

sc = SparkContext(master="local[4]")
ssc = StreamingContext(sc, 5)
ssc.checkpoint(r"/home/ubuntu/test_checkpoit")
lines = ssc.socketTextStream("10.2.87.2", 9999)
counts.pprint()
ssc.start()
ssc.awaitTermination()

資料夾裡的檔案必須是文字檔案(txt,csv,json),且只有新增的檔案才會被處理,在已經處理過的原有檔案中新增新內容是無法獲取到的,新增的檔案最後修改日期也必須是最新的時間(修改時間大於程式的執行時間),且不能與已經處理過的檔案同名,否則也無法獲取。