1. 程式人生 > >Hadoop+python入門整合測試程式碼

Hadoop+python入門整合測試程式碼

一 詞頻統計WordCount(類似TF)

屬於大資料框架中 最經典的案例:
統計檔案中每個單詞出現的個數

1.1、準備資料
# 建立目錄
$ hdfs dfs -mkdir -p /user/cloudera/wordcount/input 

#安裝上傳模組
$ sudo yum install -y lrzsz 

#建立檔案,並編輯
$ touch wordcount_mapper.py  
$ touch wordcount_reducer.py
$ vim wordcount_mapper.py
$ vim wordcount_reducer.py

#上傳測試模組(檔案)
rz  wordcount_mapper.
py rz wordcount_reducer.py # 上傳資料檔案 $ hdfs dfs -put /home/cloudera/bigdata/wc.data /user/cloudera/wordcount/input
1.2、使用Python編碼,實現wordcount統計
    - input 
        要讀取處理分析資料的路徑 
        預設情況下,一行一行的讀取檔案中的資料 
    - mapper
        函式 ,進行分析處理
    - reducer
        函式,合併map函式輸出的結果 
    
    當使用Python開發完成mapper.
py和reduce.py指令碼以後,在linux系統上要執行的話,需要給予執行許可權,命令如何: $ chmod u+x wordcount_mapper.py $ chmod u+x wordcount_reducer.py 本地測試: - wordcount_mapper.py $ echo "hadoop mapreduce mapreduce python" | python wordcount_mapper.py - wordcount_reducer.py $ echo "hadoop mapreduce mapreduce python"
| python wordcount_mapper.py | python wordcount_reducer.py
1.3、執行Python編寫的WordCountt在YARN
hadoop jar \
/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.12.0.jar \
-files /home/cloudera/word_count/wordcount_mapper.py,\
/home/cloudera/word_count/wordcount_reducer.py \
-mapper "python wordcount_mapper.py" \
-reducer "python wordcount_reducer.py" \
-input /user/cloudera/wordcount/input.wc \
-output /user/cloudera/wordcount/output
----------------------------------------------------------------
    -a. 第一點:
        提交執行Hadoop 中MapReduce執行在YARN上
        hadoop jar / yarn jar 
    -b. 第二點:
        -files 引數 將Python編寫指令碼檔案上傳到叢集上,以便叢集中各個叢集下載使用
        要求叢集中各個機器上必須按照同一版本、同一目錄的Python
    -c. 第三點:
        指定 input、output、mapper和reducer各個引數的值

這裡寫圖片描述

二、IBM股票價格資料:

	1962-01-02,7.713333,7.713333,7.626667,7.626667,0.689273,387200

    日期,開盤價,最高價,最低價,收盤價,調整的收盤價和交易量
    Date,Open,High,Low,Close,Adj Close,Volume

	統計每日變化百分比總數
	    每日變化百分比 = (開盤價-收盤價)/ 開盤價
	
	1. 統計每日變化百分比  
	    0.23%   0.24%  -0.13% 
2.1開發程式:
-a. 建立目錄stock,拷貝wordcount_mapper.py和wordcount_reducer.py檔案至stock目錄
    重新命名檔案:
    $ mv wordcount_mapper.py stock_mapper.py
    $ mv wordcount_reducer.py stock_reducer.py
-b. 開發程式碼
-c. 本地測試:
    $ more stock-ibm.csv | python stock_mapper.py | sort -k1| python stock_reducer.py > stock-ibm.output
-d. 叢集測試
    讀取HDFS上的資料,將程式提交執行在YARN上。 

------------------------------------------------------------------------------
#/user/bin/python
#encoding:utf-8

import sys

for line in  sys.stdin:
	row = line.split(","):
	open_price = float(row[1])
	close_price = float(row[-3])
	change = (open_price-close_price)/open_price*100
	change_text = str(round(change,1))+"%"
		print "%s\t%d" % (change_text,1) 
#!/user/bin/python
# encoding:utf-8

import sys
current_word = None
current_count = 1
	for line in sys.stdin:
		word,count = line.strip().split('\t')
		#判斷當前是否存在單詞
		if current_word:
			if word == current_word:
				current_count += int(count)
			else:
				print "%s\t%d"%(current_word,current_count)
				current_count =1
				
		current_word = word    #賦值當前單詞
	
	if current_count >=1:    #處理讀出最後一行資料
		print "%s\t%d"%(current_word,current_count)