1. 程式人生 > 實用技巧 >【PySpark】學習記錄1

【PySpark】學習記錄1

一. Spark介紹

Spark是一個分散式計算平臺。運算速度遠超於HDFS,並且能與python、java更好地互動。
我的疑問:在資料處理/模型訓練的過程中,Spark這個平臺是需要我手動寫一些程式碼,例如讀取資料啥的,還是我只要在帶有pyspark的kernal的平臺上執行就可以?kernal是什麼??為什麼我在NAIE平臺上選了pyspark的kernal,接下來就會報錯呢?處理方式(資料讀寫這些)又不一樣嗎?

二. 今天的程式碼

匯入的所需要的包

import os
from operator import add
from pyspark import SparkContext

檢視當前檔案所在路徑。被路徑整怕了……

os.getcwd()

輸出:

'/home/ma-user/work'

檢視這個路徑下有什麼檔案:

os.listdir('/home/ma-user/work')

輸出:

['naie_platform', '__train.json', 'preprocess.ipynb',  'requirements.txt']

可以看到,我自己建了一個testSpark.txt並沒有顯示在這裡。。

1. 想要實現的功能:統計txt檔案裡單詞數目

  • 讀取檔案並分割字串:
if len(sys.argv) < 2:
        print("Usage:wordcount <filepath>") # ???
        exit(-1)
    # initialize sparkcontext
#     sc = SparkContext(appName="Python_Word_Count") # 實際上不需要這個,會報錯,因為預設已經有了一個?或者只需要執行1次,最好是與sc.stop()一起用避免錯誤
    
# 將文字資料讀為一個存放字串的RDD
lines = sc.textFile('/home/ma-user/work/preprocess/requirements.txt') # sys.argv[1]是個json檔案,但我不懂它是什麼,也不知道會不會引起報錯
# lines = sc.textFile('sys.argv[0]') # 這個函式大概不能讀取.py 或者.json檔案吧 反正會報錯
# 把字串切分成單詞
words = lines.flatMap(lambda x:x.split(' '))
words.collect()

輸出:

['#name',
 '[condition]',
 '[version]',
 '#condition',
 '',
 '',
 '',
 '==,',
 '>=,',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1',
 'naie']
  • 每個單詞對映為(x,1)的樣子方便統計數目,利用map功能:
mapWords = words.map(lambda x:(x,1)) # PythonRDD[11] at RDD at PythonRDD.scala:52
mapWords.collect()

輸出:

[('#name', 1),
 ('[condition]', 1),
 ('[version]', 1),
 ('#condition', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('==,', 1),
 ('>=,', 1),
 ('<=,', 1),
 ('>,', 1),
 ('<', 1),
 ('#tensorflow==1.8.1', 1),
 ('naie', 1)]
  • 合併相同鍵值,實現統計單詞數目。
    如果沒有collect()這個函式,每個函式返回的都是一個PythonRDD,看不出RDD裡的值的。
combine_same_keys = mapWords.reduceByKey(add) # PythonRDD[17] at RDD at PythonRDD.scala:52 
combine_same_keys.collect()

輸出:

[('[version]', 1),
 ('#condition', 1),
 ('', 3),
 ('==,', 1),
 ('>=,', 1),
 ('naie', 1),
 ('#name', 1),
 ('[condition]', 1),
 ('<=,', 1),
 ('>,', 1),
 ('<', 1),
 ('#tensorflow==1.8.1', 1)]
  • 列印統計結果:
for (keys, counts) in combine_same_keys.collect():
    print(keys, counts)

輸出:

[version] 1
#condition 1
 3
==, 1
>=, 1
naie 1
#name 1
[condition] 1
<=, 1
>, 1
< 1
#tensorflow==1.8.1 1
  • 關閉RDD
    一開始只要開啟一次sc,然後關閉了之後下一次執行就需要再初始化一次textFile
sc.stop() # 關閉spark, 關閉後就會提示:AttributeError: 'NoneType' object has no attribute 'sc'
  • 想試一試其他功能:
    word_add1 = lines.flatMap(lambda x:x.split(' ')) # 對資料格式也有要求,並不會幫你把單詞轉為什麼東西然後+1,你看這個.map(lambda x:x+1)就不行,提示TypeError:TypeError: must be str, not int
    word_add1.collect()

  • 過濾掉重複的:

filter_same = word_add1.distinct()
filter_same.collect()

輸出:

['[version]',
 '#condition',
 '',
 '==,',
 '>=,',
 'naie',
 '#name',
 '[condition]',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1']
  • 篩選,filter是保留符合條件的,也就是將不等於'==,'和''的字元留下:
# filter_same.filter(lambda x:x!=('' and'==,'and'>=,'and'<'and'>,')).collect() # 這樣一個都刪不掉.. and/or都一樣
filter_same.filter(lambda x:x!= '==,' and x!='' ).collect() # 這樣可以刪掉兩個
['[version]',
 '#condition',
 '>=,',
 'naie',
 '#name',
 '[condition]',
 '<=,',
 '>,',
 '<',
 '#tensorflow==1.8.1']

2. 總結:整體流程是先建立一個RDD,然後對它進行操作

例:對一個數據為{1,2,3,3}的RDD進行基本RDD轉化操作

行動操作:

三. 報錯

遇見的報錯:

# ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Python_Word_Count, master=local[*]) created by __init__ at <ipython-input-46-e6dabb8e53ad>:1 

出錯語句:

sc = SparkContext(appName="Python_Word_Count")

原因是這個只要開啟一次,在沒有關閉之前,再次輸入這個語句都會提示不能同時執行多個SparkContexts。

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/ma-user/work/readData/testSpark.text

找不到該檔案。