Hadoop+python入門整合測試程式碼
阿新 • • 發佈:2019-01-02
一 詞頻統計WordCount(類似TF)
屬於大資料框架中 最經典的案例:
統計檔案中每個單詞出現的個數
1.1、準備資料
# 建立目錄
$ hdfs dfs -mkdir -p /user/cloudera/wordcount/input
#安裝上傳模組
$ sudo yum install -y lrzsz
#建立檔案,並編輯
$ touch wordcount_mapper.py
$ touch wordcount_reducer.py
$ vim wordcount_mapper.py
$ vim wordcount_reducer.py
#上傳測試模組(檔案)
rz wordcount_mapper. py
rz wordcount_reducer.py
# 上傳資料檔案
$ hdfs dfs -put /home/cloudera/bigdata/wc.data /user/cloudera/wordcount/input
1.2、使用Python編碼,實現wordcount統計
- input
要讀取處理分析資料的路徑
預設情況下,一行一行的讀取檔案中的資料
- mapper
函式 ,進行分析處理
- reducer
函式,合併map函式輸出的結果
當使用Python開發完成mapper. py和reduce.py指令碼以後,在linux系統上要執行的話,需要給予執行許可權,命令如何:
$ chmod u+x wordcount_mapper.py
$ chmod u+x wordcount_reducer.py
本地測試:
- wordcount_mapper.py
$ echo "hadoop mapreduce mapreduce python" | python wordcount_mapper.py
- wordcount_reducer.py
$ echo "hadoop mapreduce mapreduce python" | python wordcount_mapper.py | python wordcount_reducer.py
1.3、執行Python編寫的WordCountt在YARN
hadoop jar \
/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.12.0.jar \
-files /home/cloudera/word_count/wordcount_mapper.py,\
/home/cloudera/word_count/wordcount_reducer.py \
-mapper "python wordcount_mapper.py" \
-reducer "python wordcount_reducer.py" \
-input /user/cloudera/wordcount/input.wc \
-output /user/cloudera/wordcount/output
----------------------------------------------------------------
-a. 第一點:
提交執行Hadoop 中MapReduce執行在YARN上
hadoop jar / yarn jar
-b. 第二點:
-files 引數 將Python編寫指令碼檔案上傳到叢集上,以便叢集中各個叢集下載使用
要求叢集中各個機器上必須按照同一版本、同一目錄的Python
-c. 第三點:
指定 input、output、mapper和reducer各個引數的值
二、IBM股票價格資料:
1962-01-02,7.713333,7.713333,7.626667,7.626667,0.689273,387200
日期,開盤價,最高價,最低價,收盤價,調整的收盤價和交易量
Date,Open,High,Low,Close,Adj Close,Volume
統計每日變化百分比總數
每日變化百分比 = (開盤價-收盤價)/ 開盤價
1. 統計每日變化百分比
0.23% 0.24% -0.13%
2.1開發程式:
-a. 建立目錄stock,拷貝wordcount_mapper.py和wordcount_reducer.py檔案至stock目錄
重新命名檔案:
$ mv wordcount_mapper.py stock_mapper.py
$ mv wordcount_reducer.py stock_reducer.py
-b. 開發程式碼
-c. 本地測試:
$ more stock-ibm.csv | python stock_mapper.py | sort -k1| python stock_reducer.py > stock-ibm.output
-d. 叢集測試
讀取HDFS上的資料,將程式提交執行在YARN上。
------------------------------------------------------------------------------
#/user/bin/python
#encoding:utf-8
import sys
for line in sys.stdin:
row = line.split(","):
open_price = float(row[1])
close_price = float(row[-3])
change = (open_price-close_price)/open_price*100
change_text = str(round(change,1))+"%"
print "%s\t%d" % (change_text,1)
#!/user/bin/python
# encoding:utf-8
import sys
current_word = None
current_count = 1
for line in sys.stdin:
word,count = line.strip().split('\t')
#判斷當前是否存在單詞
if current_word:
if word == current_word:
current_count += int(count)
else:
print "%s\t%d"%(current_word,current_count)
current_count =1
current_word = word #賦值當前單詞
if current_count >=1: #處理讀出最後一行資料
print "%s\t%d"%(current_word,current_count)