【PySpark】學習記錄1
阿新 • • 發佈:2020-08-13
一. Spark介紹
Spark是一個分散式計算平臺。運算速度遠超於HDFS,並且能與python、java更好地互動。
我的疑問:在資料處理/模型訓練的過程中,Spark這個平臺是需要我手動寫一些程式碼,例如讀取資料啥的,還是我只要在帶有pyspark的kernal的平臺上執行就可以?kernal是什麼??為什麼我在NAIE平臺上選了pyspark的kernal,接下來就會報錯呢?處理方式(資料讀寫這些)又不一樣嗎?
二. 今天的程式碼
匯入的所需要的包
import os
from operator import add
from pyspark import SparkContext
檢視當前檔案所在路徑。被路徑整怕了……
os.getcwd()
輸出:
'/home/ma-user/work'
檢視這個路徑下有什麼檔案:
os.listdir('/home/ma-user/work')
輸出:
['naie_platform', '__train.json', 'preprocess.ipynb', 'requirements.txt']
可以看到,我自己建了一個testSpark.txt並沒有顯示在這裡。。
1. 想要實現的功能:統計txt檔案裡單詞數目
- 讀取檔案並分割字串:
if len(sys.argv) < 2: print("Usage:wordcount <filepath>") # ??? exit(-1) # initialize sparkcontext # sc = SparkContext(appName="Python_Word_Count") # 實際上不需要這個,會報錯,因為預設已經有了一個?或者只需要執行1次,最好是與sc.stop()一起用避免錯誤 # 將文字資料讀為一個存放字串的RDD lines = sc.textFile('/home/ma-user/work/preprocess/requirements.txt') # sys.argv[1]是個json檔案,但我不懂它是什麼,也不知道會不會引起報錯 # lines = sc.textFile('sys.argv[0]') # 這個函式大概不能讀取.py 或者.json檔案吧 反正會報錯 # 把字串切分成單詞 words = lines.flatMap(lambda x:x.split(' ')) words.collect()
輸出:
['#name',
'[condition]',
'[version]',
'#condition',
'',
'',
'',
'==,',
'>=,',
'<=,',
'>,',
'<',
'#tensorflow==1.8.1',
'naie']
- 每個單詞對映為(x,1)的樣子方便統計數目,利用map功能:
mapWords = words.map(lambda x:(x,1)) # PythonRDD[11] at RDD at PythonRDD.scala:52
mapWords.collect()
輸出:
[('#name', 1), ('[condition]', 1), ('[version]', 1), ('#condition', 1), ('', 1), ('', 1), ('', 1), ('==,', 1), ('>=,', 1), ('<=,', 1), ('>,', 1), ('<', 1), ('#tensorflow==1.8.1', 1), ('naie', 1)]
- 合併相同鍵值,實現統計單詞數目。
如果沒有collect()
這個函式,每個函式返回的都是一個PythonRDD,看不出RDD裡的值的。
combine_same_keys = mapWords.reduceByKey(add) # PythonRDD[17] at RDD at PythonRDD.scala:52
combine_same_keys.collect()
輸出:
[('[version]', 1),
('#condition', 1),
('', 3),
('==,', 1),
('>=,', 1),
('naie', 1),
('#name', 1),
('[condition]', 1),
('<=,', 1),
('>,', 1),
('<', 1),
('#tensorflow==1.8.1', 1)]
- 列印統計結果:
for (keys, counts) in combine_same_keys.collect():
print(keys, counts)
輸出:
[version] 1
#condition 1
3
==, 1
>=, 1
naie 1
#name 1
[condition] 1
<=, 1
>, 1
< 1
#tensorflow==1.8.1 1
- 關閉RDD
一開始只要開啟一次sc,然後關閉了之後下一次執行就需要再初始化一次textFile
sc.stop() # 關閉spark, 關閉後就會提示:AttributeError: 'NoneType' object has no attribute 'sc'
-
想試一試其他功能:
word_add1 = lines.flatMap(lambda x:x.split(' ')) # 對資料格式也有要求,並不會幫你把單詞轉為什麼東西然後+1,你看這個.map(lambda x:x+1)就不行,提示TypeError:TypeError: must be str, not int
word_add1.collect() -
過濾掉重複的:
filter_same = word_add1.distinct()
filter_same.collect()
輸出:
['[version]',
'#condition',
'',
'==,',
'>=,',
'naie',
'#name',
'[condition]',
'<=,',
'>,',
'<',
'#tensorflow==1.8.1']
- 篩選,filter是保留符合條件的,也就是將不等於'==,'和''的字元留下:
# filter_same.filter(lambda x:x!=('' and'==,'and'>=,'and'<'and'>,')).collect() # 這樣一個都刪不掉.. and/or都一樣
filter_same.filter(lambda x:x!= '==,' and x!='' ).collect() # 這樣可以刪掉兩個
['[version]',
'#condition',
'>=,',
'naie',
'#name',
'[condition]',
'<=,',
'>,',
'<',
'#tensorflow==1.8.1']
2. 總結:整體流程是先建立一個RDD,然後對它進行操作
例:對一個數據為{1,2,3,3}的RDD進行基本RDD轉化操作
行動操作:
三. 報錯
遇見的報錯:
# ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Python_Word_Count, master=local[*]) created by __init__ at <ipython-input-46-e6dabb8e53ad>:1
出錯語句:
sc = SparkContext(appName="Python_Word_Count")
原因是這個只要開啟一次,在沒有關閉之前,再次輸入這個語句都會提示不能同時執行多個SparkContexts。
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ma-user/work/readData/testSpark.text
找不到該檔案。