MapReduce實戰 - 根據文章記錄獲取時段內發帖頻率
MapReduce簡介
- MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。
- MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。
例子
資料來源結構
首先檢視資料來源結構:
CREATE TABLE `article` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`allowed_add_tag` int(2) DEFAULT NULL,
`attitudes` varchar(255) DEFAULT NULL ,
`attitudes_id` int(11) DEFAULT NULL,
`banana_count` int(11) DEFAULT NULL,
`big_cover_image` varchar(255) DEFAULT NULL,
`channel_id` int(11) DEFAULT NULL,
`channel_name` varchar(255) DEFAULT NULL,
`channel_path` varchar(255) DEFAULT NULL,
`comment_count` int(11) DEFAULT NULL,
`contribute_time` datetime DEFAULT NULL,
`cover_image` varchar(255) DEFAULT NULL,
`description` varchar(255) DEFAULT NULL,
`essense` int(2) DEFAULT NULL,
`favorite_count` int(11) DEFAULT NULL,
`latest_active_time` datetime DEFAULT NULL,
`latest_comment_time` datetime DEFAULT NULL,
`like_count` int(11) DEFAULT NULL,
`link` varchar(255) DEFAULT NULL,
`parent_channel_id` int(11) DEFAULT NULL,
`parent_channel_name` varchar(255) DEFAULT NULL,
`parent_realm_id` int(11) DEFAULT NULL,
`realm_id` int(11) DEFAULT NULL,
`realm_name` varchar(255) DEFAULT NULL,
`recommended` int(2) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`tag_list` varchar(255) DEFAULT NULL,
`title` varchar(255) DEFAULT NULL,
`top_level` int(2) DEFAULT NULL,
`tudou_domain` int(2) DEFAULT NULL,
`type_id` int(11) DEFAULT NULL,
`user_avatar` varchar(255) DEFAULT NULL,
`user_id` int(11) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`view_count` int(11) DEFAULT NULL,
`view_only` int(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=13103 DEFAULT CHARSET=utf8mb4;
複製程式碼
這裡我將其中的資料匯出為csv檔案。
思路
在這個例子中,我要做的是根據帖子釋出時間,統計全天中每隔30分鐘的發帖個數。
- 由於當前我沒有重寫InputFormat介面,因此採用的是hadoop預設的按行讀取檔案方法。所以傳入引數為<0, [一行資料]>.
InputFormat 介面 - 該介面指定輸入檔案的內容格式。
其中getSplits函式將所有輸入資料分成numSplits個split,每個split交給一個map task處理。
getRecordReader函式提供一個使用者解析split的迭代器物件,它將split中的每個record解析成key/value對。
- 獲取資料中的發帖時間
- 計算髮帖時間在全天時間中的時間段並傳遞個reduce() - <時間段, 1>
- reduce對時間段出現次數進行統計
util
首先先編寫工具類Times.java - period(str:String, format:String)方法,該方法的作用為:
根據傳入的字串和時間格式獲取一天中改時間的時間區間,如:
輸入:"2018-10-18 22:05:11", "yyyy-MM-dd HH:mm:ss"
輸出: "201810182200-201810182230"
方法如下:
public static String period(String time, String format) {
Objects.requireNonNull(time);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
int m = dateTime.getMinute();
LocalDateTime start = dateTime.withMinute(m < 30 ? 0 : 30);
LocalDateTime end = null;
if (m < 30) {
end = dateTime.withMinute(30);
} else {
end = dateTime.plusHours(1);
end = end.withMinute(0);
}
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
return start.format(dateTimeFormatter) + "-" + end.format(dateTimeFormatter);
}
複製程式碼
測試輸入:
period("2018-11-11 23:34", "yyyy-MM-dd HH:mm");
返回結果:
201811112330-201811120000
Map
TimeMapper.java程式碼為:
public class TimeMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String time = Matchers.stringCutBymatchers(value.toString(), "[0-9]{4}[/][0-9]{1,2}[/][0-9]{1,2}[ ][0-9]{1,2}[:][0-9]{1,2}[:][0-9]{1,2}");
Objects.requireNonNull(time);
String result = Times.period(time, "yyyy/MM/dd HH:mm:ss");
context.write(new Text(result), new LongWritable(1));
}
}
複製程式碼
由於按行讀取.csv檔案並且一行中的時間格式為yyyy/MM/dd HH:mm:ss,因此直接用正則表示式擷取時間。然後獲取時間區段,然後將<時間區段, 1>傳遞給reduce().
Matchers.stringCutBymatchers():
public static String stringCutBymatchers(String str, String mstr) {
Pattern patternn = Pattern.compile(mstr);
Matcher matcher = patternn.matcher(str);
String result = null;
if (matcher.find()) {
result = matcher.group(0);
}
return result;
}
複製程式碼
Reduce
reduce()階段的操作就很簡單了,只要統計時間區段出現的次數就好了
TimeReduce.java:
public class TimeReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long counts = 0L;
for (LongWritable val : values) {
counts += val.get();
}
context.write(key, new LongWritable(counts));
}
}
複製程式碼
main
main方法如下:
TimeApp.java:
public class TimeApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("請輸入input目錄和output目錄");
System.exit(2);
}
Job job = Job.getInstance(conf, "acfun-time");
job.setJarByClass(CSVApp.class);
job.setMapperClass(TimeMapper.class);
job.setReducerClass(TimeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
Path path = new Path(otherArgs[otherArgs.length - 1]);// 取第1個表示輸出目錄引數(第0個引數是輸入目錄)
FileSystem fileSystem = path.getFileSystem(conf);// 根據path找到這個檔案
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);// true的意思是,就算output有東西,也一帶刪除
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
複製程式碼
執行
最終檔案目錄如下:
其他package是為了之後繼承其他類準備,目前沒用。這裡我採用和hadoop-example一樣的啟動方法,設定一個總Main.java
public class Main {
public static void main(String[] args) {
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("citycount", CSVApp.class, "統計文章中出現的城市個數");
pgd.addClass("timecount", TimeApp.class, "統計文章時段發帖數目");
exitCode = pgd.run(args);
} catch (Throwable e) {
e.printStackTrace();
}
System.exit(exitCode);
}
}
複製程式碼
根據命令引數來選擇需要執行的job。
打包並上傳後執行。
執行
yarn jar com.dust-1.0-SNAPSHOT.jar timecount /acfun/input/dust_acfun_article.csv /acfun/output
複製程式碼
等待job執行完成:
執行完成之後通過
hdfs dfs -cat /acfun/output/part-r-00000
複製程式碼
檢視結果
之後只要將該檔案的資料提取出來畫成圖表就能直觀地檢視發帖時段了。