MapReduce與遺傳演算法、MapReduce與粒子群演算法結合與實現
阿新 • • 發佈:2019-02-17
package test; import java.io.IOException; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class posmr { public static classIntSumReducer extends Mapper<Object, Text, DoubleWritable,Text> { private DoubleWritable job1map_key = newDoubleWritable(); private Text job1map_value = new Text(); static int dim = 2; static int sizepop = 20; private final double w = 1; static double c1 = 1; static double c2 = 1; public double[] pop = new double[dim]; public double[] dpbest = new double[dim]; public double[] V = new double[dim]; public double[] fitness = newdouble[sizepop]; public double[] gbest = new double[dim]; public String[] S_value = new String[dim]; public String F_value; public double bestfitness; public double m_dFitness; private Random random = new Random(); double g; double sum1; double sum2; public void map(Object key, Text values, Contextcontext) throws IOException,InterruptedException { Stringitr[] = values.toString().split("\\s"); int k =1; F_value =""; for (int j= 0; j < dim; j++) { pop[j] =Double.valueOf((itr[k++])); V[j] =Double.valueOf((itr[k++])); dpbest[j] =Double.valueOf((itr[k++])); gbest[j] =Double.valueOf((itr[k++])); } bestfitness = Double.valueOf((itr[k++])); g =Double.valueOf((itr[k++])); for (int i= 0; i < dim; i++) { V[i] = w * V[i] + c1 *random.nextDouble() *(dpbest[i] - pop[i]) + c2 * random.nextDouble() *(gbest[i] - pop[i]); pop[i] = pop[i] + V[i]; } sum1 =0; sum2 =0; //計算Ackley 函式的值 for (int i= 0; i < dim; i++) { sum1 += pop[i] *pop[i]; sum2 += Math.cos(2 * Math.PI* pop[i]); } //m_dFitness 計算出的當前值 m_dFitness= -20 * Math.exp(-0.2 * Math.sqrt((1.0 / dim) * sum1)) - Math.exp((1.0 / dim) * sum2) + 20 +2.72; if(m_dFitness < 0) { System.out.println(sum1 + " "+ m_dFitness + " " + sum2); } if(m_dFitness < bestfitness) { bestfitness =m_dFitness; for (int i = 0; i< dim; i++) { dpbest[i] = pop[i]; } } for (int j= 0; j < dim; j++) { S_value[j] =Double.toString(pop[j]) + " " +Double.toString(V[j]) + " " +Double.toString(dpbest[j]) + " "; } for (int j= 0; j < dim; j++) { F_value += S_value[j]; } job1map_key.set(bestfitness); job1map_value.set(F_value); context.write(job1map_key, job1map_value); } } public static classjob1Reducer extends Reducer<DoubleWritable, Text, DoubleWritable,Text> { private DoubleWritable job1reduce_key = newDoubleWritable(); private Text job1reduce_value = newText(); static int dim = 2; static int sizepop = 20; public double[] pop = new double[dim]; public double[] dpbest = new double[dim]; public double[] V = new double[dim]; public double[] gbest = new double[dim]; public double[] gbest_temp = newdouble[dim]; public String[] S_value = new String[dim]; public String F_value; public double m_dFitness =Double.MAX_VALUE; public void reduce(DoubleWritable key,Iterable<Text> values, Context context) throwsIOException, InterruptedException { Doublebestfitness = Double.valueOf(key.toString()); intk; if(bestfitness < m_dFitness) { m_dFitness =bestfitness; for (Text val : values){ String itr[] = val.toString().split(" "); k = 0; for (int j = 0; j < dim; j++){ pop[j] =Double.valueOf((itr[k++])); V[j] =Double.valueOf((itr[k++])); dpbest[j]= Double.valueOf((itr[k++])); } for (int j = 0; j < dim; j++){ gbest[j] =dpbest[j]; gbest_temp[j] = dpbest[j]; } F_value = ""; for (int j = 0; j < dim; j++){ S_value[j]= Double.toString(pop[j]) + " " + Double.toString(V[j]) + " " + Double.toString(dpbest[j]) + " " + Double.toString(gbest[j]) + " "; } for (int j = 0; j < dim; j++){ F_value +=S_value[j]; } F_value += (Double.toString(bestfitness)) + "" +(Double.toString(m_dFitness)); job1reduce_key.set(1); job1reduce_value.set(F_value); context.write(job1reduce_key,job1reduce_value); } } else{ for (Text val : values){ String itr[] = val.toString().split(" "); k = 0; for (int j = 0; j < dim; j++){ pop[j] =Double.valueOf((itr[k++])); V[j] =Double.valueOf((itr[k++])); dpbest[j]= Double.valueOf((itr[k++])); } for (int j = 0; j < dim; j++){ gbest[j] =gbest_temp[j]; } F_value = ""; for (int j = 0; j < dim; j++){ S_value[j]= Double.toString(pop[j]) + " " + Double.toString(V[j]) + " " + Double.toString(dpbest[j]) + " " + Double.toString(gbest[j]) + " "; } for (int j = 0; j < dim; j++){ F_value +=S_value[j]; } F_value += (Double.toString(bestfitness)) + "" +(Double.toString(m_dFitness)); job1reduce_key.set(1); job1reduce_value.set(F_value); context.write(job1reduce_key,job1reduce_value); } } } } public static voidmain(String[] args) throws Exception { Configuration conf = new Configuration(); String[]otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount <in><out>"); System.exit(2); } Stringinput = otherArgs[0]; Stringoutput = otherArgs[1]; FileSystemfs; try{ fs =FileSystem.get(conf); int step = 20; for(int i = 0; i< step; i++) { System.out.println("第" + i + "次:" + i); Job job = new Job(conf, "word count"); job.setJarByClass(posmr.class); job.setMapperClass(IntSumReducer.class); job.setReducerClass(job1Reducer.class); job.setMapOutputKeyClass(DoubleWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, newPath(input)); FileOutputFormat.setOutputPath(job, newPath(output)); job.waitForCompletion(true); input = output; output += i; } } catch(IOException e) { e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } catch(ClassNotFoundException e) { e.printStackTrace(); } } }