1. 程式人生 > 資訊 >周鴻禕評“燃油管道被黑客掐斷”:提高基礎設施網路安全防禦迫在眉睫

周鴻禕評“燃油管道被黑客掐斷”:提高基礎設施網路安全防禦迫在眉睫

Mapper類 :

  • 使用者自定義一個Mapper類繼承Hadoop的Mapper類
  • Mapper的輸入資料是KV對的形式(型別可以自定義)
  • Map階段的業務邏輯定義在map()方法中
  • Mapper的輸出資料是KV對的形式(型別可以自定義)

注意:map()方法是對每一行資料呼叫一次!!

Reducer類

  • 使用者自定義Reducer類要繼承Hadoop的Reducer類
  • Reducer的輸入資料型別對應Mapper的輸出資料型別(KV對)
  • Reducer的業務邏輯寫在reduce()方法中
  • Reduce()方法是對相同K的一組KV對呼叫執行一次

Driver

  • 1. 獲取配置檔案物件,獲取job物件例項
  • 2. 指定程式jar的本地路徑
  • 3. 指定Mapper/Reducer類
  • 4. 指定Mapper輸出的kv資料型別
  • 5. 指定最終輸出的kv資料型別
  • 6. 指定job處理的原始資料路徑
  • 7. 指定job輸出結果路徑
  • 8. 提交作業

Maven 相關依賴:

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version
> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.9.2</version></dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId
> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.2</version> </dependency> </dependencies>

log4j.properties:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

YARN 叢集執行:

  

hadoop jar wc.jar com.lagou.wordcount.WordcountDriver
/user/lagou/input /user/lagou/output

當一個javaBean 物件需要做value,此時需要實現Writable 介面:

重寫序列化和反序列化方法:
@Override
public void write(DataOutput out) throws IOException {
....
}
@Override
public void readFields(DataInput in) throws IOException {
....
}

如果javaBean 物件需要做 key,還需要實現Comparable介面.

自定義分割槽:

  • 1. 自定義類繼承Partitioner,重寫getPartition()方法
  • 2. 在Driver驅動中,指定使用自定義Partitioner
  • 3. 在Driver驅動中,要根據自定義Partitioner的邏輯設定相應數量的ReduceTask數量。

示例:

package com.lagou.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<Text, PartitionBean> {
    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int
            numPartitions) {
        int partition = 0;
        final String appkey = text.toString();
        if (appkey.equals("kar")) {
            partition = 1;
        } else if (appkey.equals("pandora")) {
            partition = 2;
        } else {
            partition = 0;
        }
        return partition;
    }
}

總結

  • 1. 自定義分割槽器時最好保證分割槽數量與reduceTask數量保持一致;
  • 2. 如果分割槽數量不止1個,但是reduceTask數量1個,此時只會輸出一個檔案。
  • 3. 如果reduceTask數量大於分割槽數量,但是輸出多個空檔案
  • 4. 如果reduceTask數量小於分割槽數量,有可能會報錯。

Combiner:

  使用場景:每次溢寫需要,多次溢寫需要

  自定義Combiner實現步驟:

    • 自定義一個Combiner繼承Reducer,重寫Reduce方法
    • 在驅動(Driver)設定使用Combiner(預設是不適用Combiner元件)
package com.lagou.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import javax.xml.soap.Text;
import java.io.IOException;

public class WordCountCombiner extends Reducer<Text,
        IntWritable, Text, IntWritable> {
    IntWritable total = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context
            context) throws IOException, InterruptedException {
//2 遍歷key對應的values,然後累加結果
        int sum = 0;
        for (IntWritable value : values) {
            int i = value.get();
            sum += 1;
        }
// 3 直接輸出當前key對應的sum值,結果就是單詞出現的總次數
        total.set(sum);
        context.write(key, total);
    }
}
job.setCombinerClass(WordcountCombiner.class);

排序:

全域性排序:一個reducer,會呼叫key的comperTo 方法

輔助排序: ( GroupingComparator分組)

  JavaBean 實現WritableComparable 介面,實現資料的排序

  自定義GroupingComparator:繼承WritableComparator ,用於判斷多個key 是否應該被劃分到同一組,給reduce處理

示例程式碼:

javaBean:

package com.lagou.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

    private String orderId;//訂單id
    private Double price;//金額


    public OrderBean(String orderId, Double price) {
        this.orderId = orderId;
        this.price = price;
    }

    public OrderBean() {
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    //指定排序規則,先按照訂單id比較再按照金額比較,按照金額降序排
    @Override
    public int compareTo(OrderBean o) {
        int res = this.orderId.compareTo(o.getOrderId()); //0 1 -1
        if (res == 0) {
            //訂單id相同,比較金額
            res = - this.price.compareTo(o.getPrice());
        }
        return res;
    }

    //序列化
    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    //重寫toString()

    @Override
    public String toString() {
        return orderId + '\t' +
                price
                ;
    }
}

自定義GroupingComparator:

package com.lagou.mr.group;

import com.sun.corba.se.impl.orb.ParserTable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator {

    public CustomGroupingComparator() {
        super(OrderBean.class, true); //註冊自定義的GroupingComparator接受OrderBean物件
    }

    //重寫其中的compare方法,通過這個方法來讓mr接受orderid相等則兩個物件相等的規則,key相等

    @Override
    public int compare(WritableComparable a, WritableComparable b) { //a 和b是orderbean的物件
        //比較兩個物件的orderid
        final OrderBean o1 = (OrderBean) a;
        final OrderBean o2 = (OrderBean) b;
        final int i = o1.getOrderId().compareTo(o2.getOrderId());
        return i; // 0 1 -1
    }
}
//指定使用groupingcomparator
        job.setGroupingComparatorClass(CustomGroupingComparator.class);