spark streaming 使用updateByKey統計一段時間裡wordcount 無法累加問題(checkpoint無效)
阿新 • • 發佈:2021-01-11
-
程式碼功能:使用sparkStreaming的updateByKey()方法統計一段時間裡面接收到的文字中每個單詞出現的次數。
- checkpoint地址預設放在hdfs的使用者目錄下。
- 在虛擬機器中使用
dc -lk 9999 -v
在9999埠上放入文字,StreamingContext的socketTextStream()方法從埠接受文字資訊。
- 在虛擬機器中使用
- checkpoint地址預設放在hdfs的使用者目錄下。
-
發現在9999埠輸入新的值之後,控制檯輸出當前的詞頻統計資訊,但不會顯示歷史state的資訊,並且也無法累計統計。
-
推測是state為None,即無法從checkpoint獲取歷史版本。很可能是
ssc.checkpoint('checkpoint')
-
到hadoop的hdfs使用者目錄下檢視是否建立了checkpoint檔案:
hadoop fs -ls /user/root
,發現空空如也。 -
想要spark能夠自動找到hdfs的目錄需要在spark-env.sh中配置hadoop配置檔案的位置(HADOOP_HOME也是需要的)檢視spark-env.sh:
export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-2.10.1
原來這裡的路徑寫成了hadoop的主目錄了。新增etc/hadoop即可:
export HADOOP_CONF_DIR=/usr/local/hadoop/hadoop-2.10.1/etc/hadoop
-
import os SPARK_HOME = '/usr/local/spark/spark-3.0.1-bin-hadoop2.7' JAVA_HOME = '/usr/local/jdk/jdk1.8.0_271' # HADOOP_HOME = '/usr/local/hadoop/hadoop-2.10.1' # os.environ['HADOOP_HOME'] = HADOOP_HOME os.environ['SPARK_HOME'] = SPARK_HOME os.environ['JAVA_HOME'] = JAVA_HOME from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext('local[2]',appName='updateByKey') ssc = StreamingContext(sc,1) # 1.開啟檢查點——這是pudateByKey必須的 預設會在hadoop的/user/root/目錄下 ssc.checkpoint('checkpoint') # 2.定義狀態更新函式 引數一為新值,引數二為狀態,返回新的狀態 def updateFunc(new_values,last_state): return sum(new_values) + (last_state or 0) # 如果return None,key對應的state就會被刪除,所以要避免 # 3. 資料來源:監聽埠 lines = ssc.socketTextStream('localhost',9999) # 4.對資料進行處理,reduce部分使用updateByKey()函式 counts = lines.flatMap(lambda line : line.split(' ')).map(lambda x : (x,1)).updateStateByKey(updateFunc=updateFunc) counts.pprint() ssc.start() ssc.awaitTermination()