2021最新發布:大資料開發工程師 【完結】
技術標籤:talkingdatakylinsparkkafkaflink
download:大資料開發工程師 【完結】
本套大資料課程中的技術體係包含目前主流的Hadoop、Spark、Flink三大技術生態圈,涵蓋了企業中最常見的技術元件,可以滿足大家在公司中的工作需求
Q:這套課程要學多久?學完能達到什麼水平呢?
本套大資料學完的時間,和每個人的基礎、接受能力和時間安排都有關,一般情況下,如果能保證每天聽課1小時,練習至少2個小時,3~4個月是可以學完的。建議保持連續學習,這樣學習效果更好,以及通過視訊配套的思維導圖做好預習,電子書鞏固視訊內容。學完後可以達到大資料中級工程師水平,滿足絕大部分公司的大資料崗位需求。
Q:這套大資料課程中學的東西工作中夠用嗎?
足夠用的,目前本套大資料課程中的技術體係包含目前主流的Hadoop、Spark、Flink三大技術生態圈,涵蓋了企業中最常見的技術元件,可以滿足大家在公司中的工作需求。
Q:我目前是java程式設計師,大資料屬於零基礎,能學的會嗎?
可以的,java程式設計師學習大資料是具有天然優勢的,大資料中的技術框架大部分都是基於java開發的,學習起來很容易上手。 並且我們本套課程配套的有完整的電子書,方便大家及時查漏補缺,以及本套視訊教程帶有配套字幕,學起來也會更加輕鬆。
假定以上就是我們需求處置的資料,我們需求計算出每個月天氣最熱的兩天。
這個案例用到的東西很多,假如妳能靜下心來好美觀完,妳一定會收獲頗豐的
首先我們對本人提出幾個問題
1.怎樣劃分資料,怎樣定義一組???
2.思索reduce的計算復雜度???
3.能不能多個reduce???
4.如何防止資料傾斜???
5.如何自定義資料型別???
----記載特性
每年
每個月
溫度最高
2天
1天多條記載怎樣處置?
----進一步考慮
年月分組
溫度升序
key中要包含時間和溫度!
----MR原語:相同的key分到一組
經過GroupCompartor設定分組規則
----自定義資料型別Weather
包含時間
包含溫度
自定義排序比擬規則
----自定義分組比擬
年月相同被視為相同的key
那麼reduce迭代時,相同年月的記載有可能是同一天的,reduce中需求判別能否同一天
留意OOM
----資料量很大
全量資料能夠切分紅最少按一個月份的資料量停止判別
這種業務場景能夠設定多個reduce
經過完成partition
一>>>MainClass的完成
package com.huawei.mr.weather;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午7:43:40
*/
public class MainClass {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 輸入錯誤返回提示
if (args == null || args.length != 2) {
System.out.println("輸入格式有誤");
System.out.println("正確格式為:yarn jar weather.jar com.huawei.mr.weather.MainClass args[0] args[1]");
}
// 初始化hadoop默許配置檔案,假如有指定的配置,則掩蓋默許配置
Configuration conf = new Configuration(true);
// 創立Job物件,用到係統配置資訊
Job job = Job.getInstance(conf);
// 指定job入口程式
job.setJarByClass(MainClass.class);
// 設定job稱號
job.setJobName("weather");
// 指定檔案從哪裡讀取,從hdfs載入一個輸入檔案給job
FileInputFormat.addInputPath(job, new Path(args[0]));
// 指定hdfs上一個不存在的途徑作為job的輸出途徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 自主設定reduce的數量
job.setNumReduceTasks(2);
// 指定map輸出中key的型別
job.setMapOutputKeyClass(Weather.class);
// 指定map輸出中value的型別
job.setMapOutputValueClass(Text.class);
// 設定map中的比擬器,假如不設定默許採用key型別自帶的比擬器
/**
* 由於map裡面的排序和這兒的排序不一樣,稱之為二次排序
*/
job.setSortComparatorClass(WetherComparator.class);
// 設定分割槽器型別 防止資料傾斜
job.setPartitionerClass(WeatherPartitioner.class);
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReduce.class);
job.waitForCompletion(true);
}
}
二 >>>Weather 自定義key的完成
package com.huawei.mr.weather;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午8:15:26
* map中輸出key的自定義
*/
public class Weather implements WritableComparable {
private String year;
private String month;
private String day;
private Integer weather;
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getDay() {
return day;
}
public void setDay(String day) {
this.day = day;
}
public Integer getWeather() {
return weather;
}
public void setWeather(Integer weather) {
this.weather = weather;
}
@Override
public void write(DataOutput out) throws IOException {
// 把封裝的資料序列化之後寫進來
out.writeUTF(year);
out.writeUTF(month);
out.writeUTF(day);
out.writeInt(weather);
}
/*
* 讀寫的次第要分歧
*/
@Override
public void readFields(DataInput in) throws IOException {
// 把封裝的資料序列化之後讀進來
setYear(in.readUTF());
setMonth(in.readUTF());
setDay(in.readUTF());
setWeather(in.readInt());
}
@Override
public int compareTo(Weather that) {
int result = 0;
result = this.getYear().compareTo(that.getYear());
if (result == 0) {
result = this.getMonth().compareTo(that.getMonth());
if (result == 0) {
result = this.getDay().compareTo(that.getDay());
if (result == 0) {
// 假如年月日都相同,把溫度依照高到低倒序排列
result = that.getWeather().compareTo(this.getWeather());
}
}
}
return result;
}
}
三 >>>自定義map中key的比擬器用於排序
package com.huawei.mr.weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午8:29:41
* map中的比擬器設定
*/
public class WetherComparator extends WritableComparator {
public WetherComparator() {
super(Weather.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
int result = 0;
Weather wa = (Weather) a;
Weather wb = (Weather) b;
// 分組比擬器要保證同年同月為一組 和Weather裡面的排序規則不一樣
result = wa.getYear().compareTo(wb.getYear());
if (result == 0) {
result = wa.getMonth().compareTo(wb.getMonth());
if (result == 0) {
result = wb.getWeather().compareTo(wa.getWeather());
}
}
return result;
}
}
四>>>設定分割槽器防止資料傾斜
package com.huawei.mr.weather;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午8:47:46
* 分割槽器,防止資料傾斜
*/
public class WeatherPartitioner extends Partitioner<Weather, Text> {
@Override
public int getPartition(Weather key, Text value, int numPartitions) {
String month = key.getMonth();
int partitionNum = (month.hashCode() & Integer.MAX_VALUE) % numPartitions;
return partitionNum;
}
}
五>>>map裡面對每一行的處置
package com.huawei.mr.weather;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午8:55:29 map裡面的處置
*/
public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> {
private SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-mm-dd");
private Weather wea = new Weather();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每一行的資料格式為 1949-10-01 14:21:02 34c
String linStr = value.toString();
// {"1949-10-01 14:21:02","34c"}
String[] linStrs = linStr.split("\t");
// 得到溫度
int weather = Integer.parseInt(linStrs[1].substring(0, linStrs[1].length() - 1));
// 獲取時間
try {
Date date = DATE_FORMAT.parse(linStrs[0]);
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
int day = calendar.get(Calendar.DAY_OF_MONTH);
wea.setYear(year + "");
wea.setMonth(month + "");
wea.setDay(day + "");
wea.setWeather(weather);
// 把map中的值輸出
context.write(wea, value);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
六>>>reduce裡面的輸出
package com.huawei.mr.weather;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author Lpf.
* @version 創立時間:2019年4月13日 下午8:55:35
* reduce 裡面的處置
*/
public class WeatherReduce extends Reducer<Weather, Text, Text, NullWritable> {
@Override
protected void reduce(Weather key, Iterable values, Context context)
throws IOException, InterruptedException {
Iterator iterator = values.iterator();
Text text = null;
String day = null;
while (iterator.hasNext()) {
text = iterator.next();
if (day != null) {
if (!day.equals(key.getDay())) {
// 輸出本月溫度最高的第二天
context.write(text, NullWritable.get());
break;
}
} else {
// 輸出本月溫度最高的第一天
context.write(text, NullWritable.get());
day = key.getDay();
}
}
}
}