1. 程式人生 > >MapReduce案例11——影評分析5(求特定年份最好看的10部電影)

MapReduce案例11——影評分析5(求特定年份最好看的10部電影)

題目:
現有如此三份資料:
1、users.dat    資料格式為:  2::M::56::16::70072
對應欄位為:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
對應欄位中文解釋:使用者id,性別,年齡,職業,郵政編碼

2、movies.dat		資料格式為: 2::Jumanji (1995)::Adventure|Children's|Fantasy
對應欄位為:MovieID BigInt, Title String, Genres String
對應欄位中文解釋:電影ID,電影名字,電影型別

3、ratings.dat		資料格式為:  1::1193::5::978300760
對應欄位為:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
對應欄位中文解釋:使用者ID,電影ID,評分,評分時間戳

使用者ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影型別
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
(5)求好片(評分>=4.0)最多的那個年份的最好看的10部電影

思路:分四步,先求好看的電影,第二步降序選擇最好的對應的年份,通過年份求好看電影,然後降序求出前10.較為簡單,直接上主體程式碼:

/**
 * @author: lpj   
 * @date: 2018年3月16日 下午7:16:47
 * @Description:
 */
package lpj.filmCritic;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.time.Year;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import lpj.filmBean.GoodMoiveGroup;
import lpj.filmBean.GoodMoiveGroup2;
import lpj.filmBean.GoodMovieBean;
import lpj.filmBean.GoodMovieBean2;
/**
 *
 */
public class GoodMoiveMR {
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//---------------------------------
		FileSystem fs = FileSystem.get(conf);//預設使用本地
		Job job = Job.getInstance(conf);
		job.setJarByClass(GoodMoiveMR.class);
		job.setMapperClass(GoodMoiveMR_Mapper.class);
		job.setReducerClass(GoodMoiveMR_Reducer.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		Path inputPath = new Path("/a/totalFilmInfos.txt");
		Path outputPath = new Path("/a/homework11_5_1");
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		//----------------------------------------
		FileSystem fs2 = FileSystem.get(conf);//預設使用本地
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(GoodMoiveMR.class);
		job2.setMapperClass(GoodMoiveMR2_Mapper.class);
		job2.setReducerClass(GoodMoiveMR2_Reducer.class);
		job2.setOutputKeyClass(GoodMovieBean.class);
		job2.setOutputValueClass(NullWritable.class);
		job2.setGroupingComparatorClass(GoodMoiveGroup.class);
		Path inputPath2 = new Path("/a/homework11_5_1");
		Path outputPath2 = new Path("/a/homework11_5_2");
		if (fs2.exists(outputPath2)) {
			fs2.delete(outputPath2, true);
		}
		FileInputFormat.setInputPaths(job2, inputPath2);
		FileOutputFormat.setOutputPath(job2, outputPath2);
		//---------------------------------
		FileSystem fs3 = FileSystem.get(conf);//預設使用本地
		Job job3 = Job.getInstance(conf);
		job3.setJarByClass(GoodMoiveMR.class);
		job3.setMapperClass(GoodMoiveMR3_Mapper.class);
		job3.setReducerClass(GoodMoiveMR3_Reducer.class);
		job3.setMapOutputKeyClass(Text.class);
		job3.setMapOutputValueClass(Text.class);
		job3.setOutputKeyClass(Text.class);
		job3.setOutputValueClass(Text.class);
		URI uri = new URI("/a/homework11_5_2/part-r-00000");
		job3.addCacheFile(uri);
		Path inputPath3 = new Path("/a/totalFilmInfos.txt");
		Path outputPath3 = new Path("/a/homework11_5_3");
		if (fs3.exists(outputPath3)) {
			fs3.delete(outputPath3, true);
		}
		FileInputFormat.setInputPaths(job3, inputPath3);
		FileOutputFormat.setOutputPath(job3, outputPath3);
		//----------------------------------------
		FileSystem fs4 = FileSystem.get(conf);//預設使用本地
		Job job4 = Job.getInstance(conf);
		job4.setJarByClass(GoodMoiveMR.class);
		job4.setMapperClass(GoodMoiveMR4_Mapper.class);
		job4.setReducerClass(GoodMoiveMR4_Reducer.class);
		job4.setOutputKeyClass(GoodMovieBean2.class);
		job4.setOutputValueClass(NullWritable.class);
		job4.setGroupingComparatorClass(GoodMoiveGroup2.class);
		Path inputPath4 = new Path("/a/homework11_5_3");
		Path outputPath4 = new Path("/a/homework11_5_4");
		if (fs4.exists(outputPath4)) {
			fs4.delete(outputPath4, true);
		}
		FileInputFormat.setInputPaths(job4, inputPath4);
		FileOutputFormat.setOutputPath(job4, outputPath4);
		
		//-------------------------
		
		ControlledJob aJob = new ControlledJob(job.getConfiguration());
		ControlledJob bJob = new ControlledJob(job2.getConfiguration());
		ControlledJob cJob = new ControlledJob(job3.getConfiguration());
		ControlledJob dJob = new ControlledJob(job4.getConfiguration());
		aJob.setJob(job);
		bJob.setJob(job2);
		cJob.setJob(job3);
		dJob.setJob(job4);
		JobControl jc = new JobControl("jc");
		jc.addJob(aJob);
		jc.addJob(bJob);
		jc.addJob(cJob);
		jc.addJob(dJob);
		bJob.addDependingJob(aJob);
		cJob.addDependingJob(bJob);
		dJob.addDependingJob(cJob);
		Thread thread = new Thread(jc);
		thread.start();
		while(!jc.allFinished()){
			thread.sleep(1000);
		}
		jc.stop();
	}
	
	public static class GoodMoiveMR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("::");
			//使用者ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影型別
			//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
			//(5)求好片(評分>=4.0)最多的 那個年份(電影年份)的最好看的10部電影  year ratenum movieid
			String moivename = reads[8];
			String year = moivename.substring(moivename.length() - 5, moivename.length() - 1);
			int rate = Integer.parseInt(reads[2]);
			if (rate >= 4) {
				String kk = year;
				String vv = rate + "";
				kout.set(kk);
				valueout.set(vv);
				context.write(kout, valueout);
			}
		}
	}
	public static class GoodMoiveMR_Reducer extends Reducer<Text, Text, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(Text text : values){
				count++;
			}
			String vv = count + "";
			valueout.set(vv);
			context.write(key, valueout);
		}
	}
	//---------------------求年份----------------------------------
	public static class GoodMoiveMR2_Mapper extends Mapper<LongWritable, Text, GoodMovieBean, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		GoodMovieBean gm = new GoodMovieBean();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("\t");
				String year = reads[0];
				int num = Integer.parseInt(reads[1]);
				gm.setYear(year);
				gm.setNum(num);
				context.write(gm, NullWritable.get());
		}
	}
	public static class GoodMoiveMR2_Reducer extends Reducer<GoodMovieBean, NullWritable, GoodMovieBean, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(GoodMovieBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(NullWritable text : values){
				count++;
				if (count <= 1) {
					context.write(key, NullWritable.get());
				}else {
					return;
				}
			}
		}
	}
	//--------------------------求電影
	public static class GoodMoiveMR3_Mapper extends Mapper<LongWritable, Text, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		private static String goodmovieyear = "";
		
		@SuppressWarnings("deprecation")
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			Path[] paths = context.getLocalCacheFiles();
			String str = paths[0].toUri().toString();
			BufferedReader bf = new BufferedReader(new FileReader(new File(str)));
			String readline = null;
			while((readline = bf.readLine()) != null){
				goodmovieyear = readline.split("\t")[0];
			}
			IOUtils.closeStream(bf);
		}

		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("::");
			//使用者ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影型別
			//userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
			//(5)求好片(評分>=4.0)最多的 那個年份(電影年份)的最好看的10部電影  year ratenum moviename
			String moivename = reads[8];
			String year = moivename.substring(moivename.length() - 5, moivename.length() - 1);
			int rate = Integer.parseInt(reads[2]);
			if (rate >= 4 && goodmovieyear.equals(year)) {
				String kk = year + "\t" + moivename;
				String vv = rate + "";
				kout.set(kk);
				valueout.set(vv);
				context.write(kout, valueout);
			}
		}
	}
	public static class GoodMoiveMR3_Reducer extends Reducer<Text, Text, Text, Text>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(Text text : values){
				count++;
			}
			String vv = count + "";
			valueout.set(vv);
			context.write(key, valueout);
		}
	}
	//---------------------好看電影前10
	public static class GoodMoiveMR4_Mapper extends Mapper<LongWritable, Text, GoodMovieBean2, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		GoodMovieBean2 gm = new GoodMovieBean2();
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String [] reads = value.toString().trim().split("\t");
				String year = reads[0];
				String name = reads[1];
				int num = Integer.parseInt(reads[2]);
				gm.setYear(year);
				gm.setName(name);
				gm.setNum(num);
				context.write(gm, NullWritable.get());
		}
	}
	public static class GoodMoiveMR4_Reducer extends Reducer<GoodMovieBean2, NullWritable, GoodMovieBean2, NullWritable>{
		Text kout = new Text();
		Text valueout = new Text();
		@Override
		protected void reduce(GoodMovieBean2 key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
			int count = 0;
			for(NullWritable text : values){
				count++;
				if (count <= 10) {
					context.write(key, NullWritable.get());
				}else {
					return;
				}
			}
		}
	}
}

結果:

1999	American Beauty (1999)	2853
1999	Matrix, The (1999)	2171
1999	Sixth Sense, The (1999)	2163
1999	Being John Malkovich (1999)	1759
1999	Toy Story 2 (1999)	1302
1999	Galaxy Quest (1999)	1145
1999	Star Wars: Episode I - The Phantom Menace (1999)	1132
1999	Election (1999)	1130
1999	Fight Club (1999)	1096
1999	Green Mile, The (1999)	981

總結:job,job2和job3,job4基本上重複