周鴻禕評“燃油管道被黑客掐斷”:提高基礎設施網路安全防禦迫在眉睫
阿新 • • 發佈:2021-05-10
Mapper類 :
- 使用者自定義一個Mapper類繼承Hadoop的Mapper類
- Mapper的輸入資料是KV對的形式(型別可以自定義)
- Map階段的業務邏輯定義在map()方法中
- Mapper的輸出資料是KV對的形式(型別可以自定義)
注意:map()方法是對每一行資料呼叫一次!!
Reducer類
- 使用者自定義Reducer類要繼承Hadoop的Reducer類
- Reducer的輸入資料型別對應Mapper的輸出資料型別(KV對)
- Reducer的業務邏輯寫在reduce()方法中
- Reduce()方法是對相同K的一組KV對呼叫執行一次
Driver
- 1. 獲取配置檔案物件,獲取job物件例項
- 2. 指定程式jar的本地路徑
- 3. 指定Mapper/Reducer類
- 4. 指定Mapper輸出的kv資料型別
- 5. 指定最終輸出的kv資料型別
- 6. 指定job處理的原始資料路徑
- 7. 指定job輸出結果路徑
- 8. 提交作業
Maven 相關依賴:
<dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.2</version></dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.2</version> </dependency> </dependencies>
log4j.properties:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
YARN 叢集執行:
hadoop jar wc.jar com.lagou.wordcount.WordcountDriver /user/lagou/input /user/lagou/output
當一個javaBean 物件需要做value,此時需要實現Writable 介面:
重寫序列化和反序列化方法: @Override public void write(DataOutput out) throws IOException { .... } @Override public void readFields(DataInput in) throws IOException { .... }
如果javaBean 物件需要做 key,還需要實現Comparable介面.
自定義分割槽:
- 1. 自定義類繼承Partitioner,重寫getPartition()方法
- 2. 在Driver驅動中,指定使用自定義Partitioner
- 3. 在Driver驅動中,要根據自定義Partitioner的邏輯設定相應數量的ReduceTask數量。
示例:
package com.lagou.mr.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class CustomPartitioner extends Partitioner<Text, PartitionBean> { @Override public int getPartition(Text text, PartitionBean partitionBean, int numPartitions) { int partition = 0; final String appkey = text.toString(); if (appkey.equals("kar")) { partition = 1; } else if (appkey.equals("pandora")) { partition = 2; } else { partition = 0; } return partition; } }
總結
- 1. 自定義分割槽器時最好保證分割槽數量與reduceTask數量保持一致;
- 2. 如果分割槽數量不止1個,但是reduceTask數量1個,此時只會輸出一個檔案。
- 3. 如果reduceTask數量大於分割槽數量,但是輸出多個空檔案
- 4. 如果reduceTask數量小於分割槽數量,有可能會報錯。
Combiner:
使用場景:每次溢寫需要,多次溢寫需要
自定義Combiner實現步驟:
-
- 自定義一個Combiner繼承Reducer,重寫Reduce方法
- 在驅動(Driver)設定使用Combiner(預設是不適用Combiner元件)
package com.lagou.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable total = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //2 遍歷key對應的values,然後累加結果 int sum = 0; for (IntWritable value : values) { int i = value.get(); sum += 1; } // 3 直接輸出當前key對應的sum值,結果就是單詞出現的總次數 total.set(sum); context.write(key, total); } }
job.setCombinerClass(WordcountCombiner.class);
排序:
全域性排序:一個reducer,會呼叫key的comperTo 方法
輔助排序: ( GroupingComparator分組)
JavaBean 實現WritableComparable 介面,實現資料的排序
自定義GroupingComparator:繼承WritableComparator ,用於判斷多個key 是否應該被劃分到同一組,給reduce處理
示例程式碼:
javaBean:
package com.lagou.mr.group; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private String orderId;//訂單id private Double price;//金額 public OrderBean(String orderId, Double price) { this.orderId = orderId; this.price = price; } public OrderBean() { } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } //指定排序規則,先按照訂單id比較再按照金額比較,按照金額降序排 @Override public int compareTo(OrderBean o) { int res = this.orderId.compareTo(o.getOrderId()); //0 1 -1 if (res == 0) { //訂單id相同,比較金額 res = - this.price.compareTo(o.getPrice()); } return res; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeDouble(price); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.price = in.readDouble(); } //重寫toString() @Override public String toString() { return orderId + '\t' + price ; } }
自定義GroupingComparator:
package com.lagou.mr.group; import com.sun.corba.se.impl.orb.ParserTable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CustomGroupingComparator extends WritableComparator { public CustomGroupingComparator() { super(OrderBean.class, true); //註冊自定義的GroupingComparator接受OrderBean物件 } //重寫其中的compare方法,通過這個方法來讓mr接受orderid相等則兩個物件相等的規則,key相等 @Override public int compare(WritableComparable a, WritableComparable b) { //a 和b是orderbean的物件 //比較兩個物件的orderid final OrderBean o1 = (OrderBean) a; final OrderBean o2 = (OrderBean) b; final int i = o1.getOrderId().compareTo(o2.getOrderId()); return i; // 0 1 -1 } }
//指定使用groupingcomparator job.setGroupingComparatorClass(CustomGroupingComparator.class);