MapReduce實現線性迴歸
阿新 • • 發佈:2018-12-31
1. 軟體版本:
Hadoop2.6.0(IDEA中原始碼編譯使用CDH5.7.3,對應Hadoop2.6.0),叢集使用原生Hadoop2.6.4,JDK1.8,Intellij IDEA 14 。原始碼可以在https://github.com/fansy1990/linear_regression 下載。
2. 實現思路:
本部落格實現的是一元一次線性方程,等於是最簡單的線性方程了,採用的是Couresa裡面的機器學習中的大資料線性方程的方法來更新引數值的(即隨機梯度下降方法,當然也可以使用批量梯度下降方法來實現,只是在LinearRegressionJob中實現的不一樣而已),如果對隨機梯度下降或者批量梯度下降不瞭解的話,需要先去看看。下面是實現思路:2.1 Shuffle Data(打亂資料):
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(randN <= 0) { // 如果randN 比0小,那麼不再次打亂資料 context.write(randFloatKey,value); return ; } if(++countI >= randN){// 如果randN等於1,那麼每次隨機的值都是不一樣的 randFloatKey.set(random.nextFloat()); countI =0; } context.write(randFloatKey,value); }
2.2 Linear Regression(線性迴歸):
線性迴歸採用隨機梯度下降的方法來更新theta0和theta1 (只實現了一元一次,所以只有兩個引數),每個Mapper都會使用同樣的初始化引數(theta0=1和theta1=0),在每個Mapper中使用自己的資料來更新theta0和theta1,更新的公式為:theta0 = theta0 -alpha*(h(x)-y)x
theta1 = theta1 -alpha*(h(x)-y)x
其中,h(x)= theta0 + theta1 * x ;同時,需要注意這裡的更新是同步更新,其核心程式碼如下所示:然後在每個Mapper的cleanup函式中直接輸出theta的引數值即可protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { float[] xy = Utils.str2float(value.toString().split(splitter)); float x = xy[0]; float y = xy[1]; // 同步更新 theta0 and theta1 lastTheta0 = theta0; theta0 -= alpha *(theta0+theta1* x - y) * x; // 保持theta0 和theta1 不變 theta1 -= alpha *(lastTheta0 + theta1 * x -y) * x;// 保持theta0 和theta1 不變 }
protected void cleanup(Context context) throws IOException, InterruptedException {
theta0_1.set(theta0 + splitter + theta1);
context.write(theta0_1,NullWritable.get());
}
由於在每個mapper中已經更新了theta的各個引數值,所以不需要使用reducer即可;同時,由於測試資料比較小,所以設定mapreduce.input.fileinputformat.split.maxsize的大小,讀者需要根據自己實際資料的大小來設定,其Driver類核心程式碼如下所示:conf.setLong("mapreduce.input.fileinputformat.split.maxsize",700L);// 獲取多個mapper;
job.setNumReduceTasks(0);
2.3 Combine Theta (合併引數值):
在2.2步中已經算得了各個theta值,那麼應該如何來合併這些求得得各個theta值呢?可以直接用平均值麼?對於一元一次線性迴歸是可以直接使用平均值來作為最終合併後的theta值的,但是針對其他的線性迴歸(特指有多個區域性最小值的線性迴歸,這樣求得的多個theta值合併就會有問題了)。如果只是使用平均值的話,那麼在2.2步其實加一個Reducer就可以完成了,這裡提出了一種另外的方式來合併theta值,即採用各個theta值的全域性誤差作為引數來進行加權。所以,在Mapper的setup中會讀取2.2中的多個輸出theta值,在map函式中針對各個原始資料求其誤差,輸出到reducer的資料為theta值和其誤差;其核心程式碼如下所示:protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
float[] xy = Utils.str2float(value.toString().split(splitter));
for(int i =0;i<thetas.size() ;i++){
// error = (theta0 + theta1 * x - y) ^2
thetaErrors[i] += (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) *
(thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) ;
thetaNumbers[i]+= 1;
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
for(int i =0;i<thetas.size() ;i++){
theta.set(thetas.get(i));
floatAndLong.set(thetaErrors[i],thetaNumbers[i]);
context.write(theta,floatAndLong);
}
}
在Reducer端,直接針對每個鍵(也就是theta值)把各個誤差加起來,在cleanup函式中採用加權來合併theta值,其核心程式碼如下所示:protected void reduce(FloatAndFloat key, Iterable<FloatAndLong> values, Context context) throws IOException, InterruptedException {
float sumF = 0.0f;
long sumL = 0L ;
for(FloatAndLong value:values){
sumF +=value.getSumFloat();
sumL += value.getSumLong();
}
theta_error.add(new float[]{key.getTheta0(),key.getTheta1(), (float)Math.sqrt((double)sumF / sumL)});
logger.info("theta:{}, error:{}", new Object[]{key.toString(),Math.sqrt(sumF/sumL)});
}
protected void cleanup(Context context) throws IOException, InterruptedException {
// 如何加權?
// 方式1:如果誤差越小,那麼說明權重應該越大;
// 方式2:直接平均值
float [] theta_all = new float[2];
if("average".equals(method)){
// theta_all = theta_error.get(0);
for(int i=0;i< theta_error.size();i++){
theta_all[0] += theta_error.get(i)[0];
theta_all[1] += theta_error.get(i)[1];
}
theta_all[0] /= theta_error.size();
theta_all[1] /= theta_error.size();
} else {
float sumErrors = 0.0f;
for(float[] d:theta_error){
sumErrors += 1/d[2];
}
for(float[] d: theta_error){
theta_all[0] += d[0] * 1/d[2] /sumErrors;
theta_all[1] += d[1] * 1/d[2] /sumErrors;
}
}
context.write(new FloatAndFloat(theta_all),NullWritable.get());
}
2.4 驗證
這裡的驗證指的是使用2.3步求的得合併後的theta值求全域性誤差,由於在2.3步也求得了各個theta值的全域性誤差,所以這裡可以對比看下哪個theta值最優;其Mapper可以直接使用2.3步驟的mapper,而reducer也類似2.3步驟中的reducer,只是最終輸出就不需要cleanup中的合併了。3. 執行結果:
3.1 shuffle Job
測試類:public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://master:8020/user/fanzhe/linear_regression.txt",
"hdfs://master:8020/user/fanzhe/shuffle_out",
"1"
} ;
ToolRunner.run(Utils.getConf(),new ShuffleDataJob(),args);
}
原始資料:(可以在原始碼中的resource目錄中下載 linear_regression.txt)
6.1101,17.592
5.5277,9.1302
8.5186,13.662
。。。
Shuffle輸出:每次輸出應該都是不一樣的(使用了隨機數),可以看到資料確實被隨機化了。3.2 Linear Regression
測試類:public static void main(String[] args) throws Exception {
// <input> <output> <theta0;theta1;alpha> <splitter> // 注意第三個引數使用分號分割
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/linear_regression",
"1;0;0.01",
","
} ;
ToolRunner.run(Utils.getConf(),new LinearRegressionJob(),args);
}
檢視輸出結果:從輸出結果可以看出,兩個結果相差還是很大的,這個主要是因為測試資料比較少的原因,如果資料比較大,並且被很好的shuffle的話,那麼這兩個值應該是相差不大的;3.3 Combine Theta
測試類:public static void main(String[] args) throws Exception {
// <input> <output> <theta_path> <splitter> <average|weight>
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/single_linear_regression_error",
"hdfs://master:8020/user/fanzhe/linear_regression",
",",
"weight"
} ;
ToolRunner.run(Utils.getConf(),new SingleLinearRegressionError(),args);
}
這裡設定的合併theta值的方式使用加權,讀者可以設定為average,從而使用平均值;結果:根據日誌可以看出theta引數值選取下面的一個,其誤差會比較小,合併後的引數值為:
看到其結果是在兩個theta引數值之間。如果是平均值,那麼其輸出結果為:
3.4 驗證
驗證測試類:public static void main(String[] args) throws Exception {
// <input> <output> <theta_path> <splitter>
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/last_linear_regression_error",
"hdfs://master:8020/user/fanzhe/single_linear_regression_error",
",",
} ;
ToolRunner.run(Utils.getConf(),new LastLinearRegressionError(),args);
}
輸出結果為:從結果中可以看出,合併後的結果並沒有原來其中的一個Theta引數組值的效果好,不過這個也可能和資料量有關,根據輸出結果,也可以把合併後的theta值以及合併前的對比,然後使用最優的theta來作為最後的輸出。如果是平均值,那麼其輸出結果為:
從上面的結果可以看到加權的組合比平均值的組合效果好點;