MapReduce實現線性回歸
阿新 • • 發佈:2018-02-12
使用 reducer watermark hdfs 多少 局部最優 情況下 urn dex
假設要採用隨機梯度下降的話,那麽須要保持原始數據隨機,所以這裏的第一步就是隨機打亂原始數據。 然後在每一個Mapper的cleanup函數中直接輸出theta的參數值就可以
原始數據:(能夠在源代碼中的resource文件夾中下載 linear_regression.txt)
依據日誌能夠看出theta參數值選取以下的一個,其誤差會比較小,合並後的參數值為:
看到其結果是在兩個theta參數值之間。假設是平均值。那麽其輸出結果為:
從結果中能夠看出,合並後的結果並沒有原來當中的一個Theta參數組值的效果好,只是這個也可能和數據量有關,依據輸出結果。也能夠把合並後的theta值以及合並前的對照。然後使用最優的theta來作為最後的輸出。
1. 軟件版本號:
Hadoop2.6.0(IDEA中源代碼編譯使用CDH5.7.3,相應Hadoop2.6.0),集群使用原生Hadoop2.6.4。JDK1.8,Intellij IDEA 14 。
源代碼能夠在https://github.com/fansy1990/linear_regression?下載。
2. 實現思路:
本博客實現的是一元一次線性方程,等於是最簡單的線性方程了。採用的是Couresa裏面的機器學習中的大數據線性方程的方法來更新參數值的(即隨機梯度下降方法,當然也能夠使用批量梯度下降方法來實現,僅僅是在LinearRegressionJob中實現的不一樣而已),假設對隨機梯度下降或者批量梯度下降不了解的話。須要先去看看。以下是實現思路:2.1 Shuffle Data(打亂數據):
採用的思路是:在Mapper端輸出隨機值作為key,輸出當前記錄作為value,在Reducer端直接遍歷每一個key的全部values,直接輸出value以及NullWritable.get就可以。在這裏加入一個額外的參數randN。這個參數表示在Mapper端隨機值時,多少個原始數據使用同一個隨機值。假設randN為1。那麽每一個原始數據都會使用一個隨機值作為key。假設randN為2,那麽每兩個原始數據使用一個隨機值,假設randN為0或小於0。那麽全部數據都使用同一個隨機值(註意,這個時候事實上在Reducer端的values事實上也是亂序的,請讀者思考為什麽?)。其Mapper中map核心實現例如以下所看到的
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(randN <= 0) { // 假設randN 比0小。那麽不再次打亂數據 context.write(randFloatKey,value); return ; } if(++countI >= randN){// 假設randN等於1。那麽每次隨機的值都是不一樣的 randFloatKey.set(random.nextFloat()); countI =0; } context.write(randFloatKey,value); }
2.2 Linear Regression(線性回歸):
線性回歸採用隨機梯度下降的方法來更新theta0和theta1 (僅僅實現了一元一次,所以僅僅有兩個參數),每一個Mapper都會使用相同的初始化參數(theta0=1和theta1=0),在每一個Mapper中使用自己的數據來更新theta0和theta1,更新的公式為:theta0 = theta0 -alpha*(h(x)-y)x theta1 = theta1 -alpha*(h(x)-y)x當中,h(x)= theta0 + theta1 * x ;同一時候。須要註意這裏的更新是同步更新,其核心代碼例如以下所看到的:
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { float[] xy = Utils.str2float(value.toString().split(splitter)); float x = xy[0]; float y = xy[1]; // 同步更新 theta0 and theta1 lastTheta0 = theta0; theta0 -= alpha *(theta0+theta1* x - y) * x; // 保持theta0 和theta1 不變 theta1 -= alpha *(lastTheta0 + theta1 * x -y) * x;// 保持theta0 和theta1 不變 }
protected void cleanup(Context context) throws IOException, InterruptedException { theta0_1.set(theta0 + splitter + theta1); context.write(theta0_1,NullWritable.get()); }因為在每一個mapper中已經更新了theta的各個參數值,所以不須要使用reducer就可以;同一時候。因為測試數據比較小。所以設置mapreduce.input.fileinputformat.split.maxsize的大小,讀者須要依據自己實際數據的大小來設置。其Driver類核心代碼例如以下所看到的:
conf.setLong("mapreduce.input.fileinputformat.split.maxsize",700L);// 獲取多個mapper; job.setNumReduceTasks(0);
2.3 Combine Theta (合並參數值):
在2.2步中已經算得了各個theta值。那麽應該怎樣來合並這些求得得各個theta值呢?能夠直接用平均值麽?對於一元一次線性回歸是能夠直接使用平均值來作為終於合並後的theta值的,可是針對其它的線性回歸(特指有多個局部最小值的線性回歸。這樣求得的多個theta值合並就會有問題了)。假設僅僅是使用平均值的話。那麽在2.2步事實上加一個Reducer就能夠完畢了,這裏提出了一種另外的方式來合並theta值。即採用各個theta值的全局誤差作為參數來進行加權。所以,在Mapper的setup中會讀取2.2中的多個輸出theta值。在map函數中針對各個原始數據求其誤差,輸出到reducer的數據為theta值和其誤差;其核心代碼例如以下所看到的:protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { float[] xy = Utils.str2float(value.toString().split(splitter)); for(int i =0;i<thetas.size() ;i++){ // error = (theta0 + theta1 * x - y) ^2 thetaErrors[i] += (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) * (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) ; thetaNumbers[i]+= 1; } }
protected void cleanup(Context context) throws IOException, InterruptedException { for(int i =0;i<thetas.size() ;i++){ theta.set(thetas.get(i)); floatAndLong.set(thetaErrors[i],thetaNumbers[i]); context.write(theta,floatAndLong); } }在Reducer端。直接針對每一個鍵(也就是theta值)把各個誤差加起來,在cleanup函數中採用加權來合並theta值,其核心代碼例如以下所看到的:
protected void reduce(FloatAndFloat key, Iterable<FloatAndLong> values, Context context) throws IOException, InterruptedException { float sumF = 0.0f; long sumL = 0L ; for(FloatAndLong value:values){ sumF +=value.getSumFloat(); sumL += value.getSumLong(); } theta_error.add(new float[]{key.getTheta0(),key.getTheta1(), (float)Math.sqrt((double)sumF / sumL)}); logger.info("theta:{}, error:{}", new Object[]{key.toString(),Math.sqrt(sumF/sumL)}); }
protected void cleanup(Context context) throws IOException, InterruptedException { // 怎樣加權? // 方式1:假設誤差越小。那麽說明權重應該越大; // 方式2:直接平均值 float [] theta_all = new float[2]; if("average".equals(method)){ // theta_all = theta_error.get(0); for(int i=0;i< theta_error.size();i++){ theta_all[0] += theta_error.get(i)[0]; theta_all[1] += theta_error.get(i)[1]; } theta_all[0] /= theta_error.size(); theta_all[1] /= theta_error.size(); } else { float sumErrors = 0.0f; for(float[] d:theta_error){ sumErrors += 1/d[2]; } for(float[] d: theta_error){ theta_all[0] += d[0] * 1/d[2] /sumErrors; theta_all[1] += d[1] * 1/d[2] /sumErrors; } } context.write(new FloatAndFloat(theta_all),NullWritable.get()); }
2.4 驗證
這裏的驗證指的是使用2.3步求的得合並後的theta值求全局誤差,因為在2.3步也求得了各個theta值的全局誤差。所以這裏能夠對照看下哪個theta值最優;其Mapper能夠直接使用2.3步驟的mapper,而reducer也相似2.3步驟中的reducer,僅僅是終於輸出就不須要cleanup中的合並了。3. 執行結果:
3.1 shuffle Job
測試類:public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://master:8020/user/fanzhe/linear_regression.txt", "hdfs://master:8020/user/fanzhe/shuffle_out", "1" } ; ToolRunner.run(Utils.getConf(),new ShuffleDataJob(),args); }
原始數據:(能夠在源代碼中的resource文件夾中下載 linear_regression.txt)
6.1101,17.592 5.5277,9.1302 8.5186,13.662 。Shuffle輸出:每次輸出應該都是不一樣的(使用了隨機數),能夠看到數據確實被隨機化了。。
。
3.2 Linear Regression
測試類:public static void main(String[] args) throws Exception { // <input> <output> <theta0;theta1;alpha> <splitter> // 註意第三個參數使用分號切割 args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/linear_regression", "1;0;0.01", "," } ; ToolRunner.run(Utils.getConf(),new LinearRegressionJob(),args); }查看輸出結果:從輸出結果能夠看出。兩個結果相差還是非常大的,這個主要是因為測試數據比較少的原因。假設數據比較大。而且被非常好的shuffle的話。那麽這兩個值應該是相差不大的;
3.3 Combine Theta
測試類:public static void main(String[] args) throws Exception { // <input> <output> <theta_path> <splitter> <average|weight> args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/single_linear_regression_error", "hdfs://master:8020/user/fanzhe/linear_regression", ",", "weight" } ; ToolRunner.run(Utils.getConf(),new SingleLinearRegressionError(),args); }這裏設置的合並theta值的方式使用加權。讀者能夠設置為average,從而使用平均值;結果:
依據日誌能夠看出theta參數值選取以下的一個,其誤差會比較小,合並後的參數值為:
看到其結果是在兩個theta參數值之間。假設是平均值。那麽其輸出結果為:
3.4 驗證
驗證測試類:public static void main(String[] args) throws Exception { // <input> <output> <theta_path> <splitter> args = new String[]{ "hdfs://master:8020/user/fanzhe/shuffle_out", "hdfs://master:8020/user/fanzhe/last_linear_regression_error", "hdfs://master:8020/user/fanzhe/single_linear_regression_error", ",", } ; ToolRunner.run(Utils.getConf(),new LastLinearRegressionError(),args); }輸出結果為:
從結果中能夠看出,合並後的結果並沒有原來當中的一個Theta參數組值的效果好,只是這個也可能和數據量有關,依據輸出結果。也能夠把合並後的theta值以及合並前的對照。然後使用最優的theta來作為最後的輸出。
假設是平均值,那麽其輸出結果為:
從上面的結果能夠看到加權的組合比平均值的組合效果好點。
4. 總結
1. 改算法僅僅針對有一個局部最優解(也就是全局最優解)的情況,否則,在合並階段會有問題。2. 通過小量數據驗證,使用合並後的效果並沒有使用合並前的最優解的效果好,這個可能是數據問題,待驗證;3. 通過非常直觀的想象,普通情況下使用加權組合要比平均組好效果好。分享,成長。快樂
轉載請註明blog地址:http://blog.csdn.net/fansy1990
MapReduce實現線性回歸