1. 程式人生 > 實用技巧 >spark streaming 使用updateByKey統計一段時間裡wordcount 無法累加問題(checkpoint無效)

spark streaming 使用updateByKey統計一段時間裡wordcount 無法累加問題(checkpoint無效)

  • 程式碼功能:使用sparkStreaming的updateByKey()方法統計一段時間裡面接收到的文字中每個單詞出現的次數。

    • checkpoint地址預設放在hdfs的使用者目錄下。
      • 在虛擬機器中使用dc -lk 9999 -v在9999埠上放入文字,StreamingContext的socketTextStream()方法從埠接受文字資訊。
  • 發現在9999埠輸入新的值之後,控制檯輸出當前的詞頻統計資訊,但不會顯示歷史state的資訊,並且也無法累計統計。

  • 推測是state為None,即無法從checkpoint獲取歷史版本。很可能是ssc.checkpoint('checkpoint')

    這裡的路徑解析有問題。於是:

    1. 到hadoop的hdfs使用者目錄下檢視是否建立了checkpoint檔案:hadoop fs -ls /user/root,發現空空如也。

    2. 想要spark能夠自動找到hdfs的目錄需要在spark-env.sh中配置hadoop配置檔案的位置(HADOOP_HOME也是需要的)檢視spark-env.sh:

      export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-2.10.1
      

      原來這裡的路徑寫成了hadoop的主目錄了。新增etc/hadoop即可:

      export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-2.10.1/etc/hadoop
      
import os
SPARK_HOME = '/usr/local/spark/spark-3.0.1-bin-hadoop2.7'
JAVA_HOME = '/usr/local/jdk/jdk1.8.0_271'
# HADOOP_HOME = '/usr/local/hadoop/hadoop-2.10.1'
# os.environ['HADOOP_HOME'] = HADOOP_HOME
os.environ['SPARK_HOME'] = SPARK_HOME
os.environ['JAVA_HOME'] = JAVA_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext('local[2]',appName='updateByKey')
ssc = StreamingContext(sc,1)

# 1.開啟檢查點——這是pudateByKey必須的   預設會在hadoop的/user/root/目錄下
ssc.checkpoint('checkpoint')

# 2.定義狀態更新函式 引數一為新值,引數二為狀態,返回新的狀態
def updateFunc(new_values,last_state):
    return  sum(new_values) + (last_state or 0)     # 如果return None,key對應的state就會被刪除,所以要避免

# 3. 資料來源:監聽埠
lines = ssc.socketTextStream('localhost',9999)

# 4.對資料進行處理,reduce部分使用updateByKey()函式
counts = lines.flatMap(lambda line : line.split(' ')).map(lambda x : (x,1)).updateStateByKey(updateFunc=updateFunc)

counts.pprint()

ssc.start()
ssc.awaitTermination()