hadoop:hdfs分佈儲存+ mr分佈計算
阿新 • • 發佈:2018-12-16
- hdfs 和RDBMS區別
- mr 和 網格計算,志願計算
1,資料儲存
磁碟儲存 | 解決分散式問題 | 硬體需求 | 系統瓶頸 | |
---|---|---|---|---|
hdfs | 磁碟陣列-叢集 | 硬體故障,多資料來源的資料準確性 | 普通機 | 資料傳輸:硬碟頻寬 |
RDBMS | 單磁碟 | 專業伺服器 | 磁碟定址:大量資料更新 |
2,分析計算
適用場 | 特點 | 生態圈 | 結構特點 | 資料完整性 | 可擴充套件性 | 資料集結構化程度 | |
---|---|---|---|---|---|---|---|
mr | PB級資料:批處理 | 一寫多讀 | yarn整合其他分散式程式,hive,saprk | 讀模式 | 低 | 高 | 半、非結構化 |
RDBMS | GB級資料:實時檢索,更新 | 持續更新 | 寫模式 | 高 | 低 | 結構化 |
3,網格計算,志願計算
特點 | 適用場景 | |
---|---|---|
網格計算 | 分散節點計算+ 網路共享檔案系統 | 小規模資料:無網路傳輸瓶頸 |
網格計算 | 任務單元化+ 分散計算+ 校驗結果 | cup密集型:計算時間>傳輸時間 |
mr | 轉移計算+ 資料本地化 | 作業週期短(小時計),高速區域網內,高配硬體 |
4,mr 對比linux:awk流處理
1,awk處理: 年度最高溫度統計
2,mapreduce處理:每年最高溫度統計
idea +maven: 新增依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency>
map方法
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Map1 extends Mapper<LongWritable, Text, IntWritable,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //整理的資料輸入: //1982,-8 //1931,-4 String str = value.toString(); String[] arr = str.split(","); int year=0, tmp=Integer.MIN_VALUE; //資料轉換 try { year= Integer.parseInt(arr[0]); tmp= Integer.parseInt(arr[1]); }catch (Exception e){ e.printStackTrace(); } //輸出:新資料 context.write(new IntWritable(year),new IntWritable(tmp)); } }
reduce方法
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class Reduce1 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//輸入資料:1931,【-4,23,4,35,6】
//聚合資料: 求每組資料中的max(tmp)
int max=Integer.MIN_VALUE;
Iterator<IntWritable> it = values.iterator();
while (it.hasNext()){
IntWritable next = it.next();
int tmp = next.get();
max= (max >tmp) ? max:tmp;
}
//輸出: 最高溫度
context.write(key, new IntWritable(max));
}
}
app類: 排程組織job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class App1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(App1.class);
job.setJobName("maxTmp");
//map,reduce
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce1.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
//輸入輸出
FileInputFormat.addInputPath(job,new Path("/home/wang/txt/tmp.txt"));
FileOutputFormat.setOutputPath(job, new Path("/home/wang/tmp-out"));
//提交等待
job.waitForCompletion(true);
}
}