1. 程式人生 > >第十七天 -- IDEA -- MAVEN -- AWK -- MapReduce簡單案例

第十七天 -- IDEA -- MAVEN -- AWK -- MapReduce簡單案例

第十七天 – IDEA – MAVEN – AWK – MapReduce簡單案例

一、IDEA

安裝

下載地址 點選進入

分為旗艦版和社群版,旗艦版需要付費,可以破解,社群版完全免費,它們在一些功能支援上有所差異。推薦使用旗艦版。

下載後直接一路下一步安裝即可,可以修改安裝路徑等配置。

破解

旗艦版安裝完成後需要破解,破解過程 點選進入 按照該網頁提示的過程即可破解。

簡單配置

設定位置:File – Settings

  • 字型大小、顏色等:Editor – Font / Color Scheme

  • 程式碼提示:Editor – General – Code Completion,建議選擇首字母匹配

    1539065614924

  • 編碼設定:Editor – File Encodings,將下圖三處改為UTF-8

    1539065710566

  • 修改jdk版本:Build,Execution,Deployment – Compiler – Java Compiler

    1539065857128

  • 專案中修改Modules的jdk:File – Project Structure,選中Modules

    1539057987226

常用快捷鍵
  1. Ctrl+Shift + Enter,語句補全
  2. Ctrl+Alt+L,格式化程式碼(與qq快捷鍵衝突)
  3. Ctrl+Enter,匯入包,自動修正
  4. Ctrl+Alt+T,把程式碼包在一個塊內,例如:if、if/else、try/catch等
  5. Ctrl+Alt+V,引入變數。例如:輸入new ArrayList(); 會自動補全為ArrayList<Object> objects = new ArrayList<>();
  6. Ctrl+X,刪除當前行
  7. Ctrl+D,複製當前行到下一行
  8. Ctrl+/或Ctrl+Shift+/,註釋(//或者/**/)
  9. Shift+Enter,向下插入新行
  10. Ctrl+O,重寫方法
  11. Ctrl+I,實現方法

二、Maven

簡介

Apache Maven是一個軟體專案管理和綜合工具。基於專案物件模型(POM)的概念,Maven可以從一箇中心資料片管理專案構建,報告和檔案。

下載
解壓安裝

直接使用解壓縮軟體解壓至某個安裝目錄下,如D:\software\apache-maven-3.3.9

配置環境變數

右鍵此電腦 – 屬性 – 高階系統設定 – 環境變數

  1. 在系統變數中新建MVN_HOME,值D:\software\apache-maven-3.3.9
  2. 編輯Path變數,新增%MVN_HOME%\bin
測試

開啟cmd,輸入mvn -version,出現以下資訊,代表安裝、配置環境變數成功

1539066633879

配置maven本地倉庫

開啟maven安裝目錄comf資料夾下的settings.xml,預設配置是將本地倉庫存放在該使用者家目錄的.m2資料夾下的repository下,但考慮到使用者目錄一般在C盤,下載的jar包多了之後佔用系統盤空間,所以自行配置將存放位置配置到其他盤中

D:\m2\repository

1539056586783

Idea中配置Maven

File – Settings --Build,Execution,Deployment – Maven

配置Maven home directory為本地安裝目錄(idea自帶兩個maven,但是不利於進行配置,所以使用自己安裝的maven)

配置User settings file,將Override打上勾,選擇自己的配置檔案settings.xml,選中後Local repository會自動變為settings.xml上一步配置的本地倉庫目錄

1539056559677

**注意:**settings只是設定當前專案,需要選擇other settings --> default settings(或者是settings for new projects)再設定一遍。這樣新建專案後設置項也不會變回默認了

1539057050469

Maven依賴查詢

三、IDEA專案

新建Maven專案

File – New – Project,選擇Maven,選擇sdk版本,可以New本機jdk安裝目錄

1539057178906

1539057223993

pom.xml

新建maven專案時,會自動生成pom.xml,此檔案中可以配置依賴、打包選項等功能。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.szbd</groupId>
    <artifactId>sz_hadoop</artifactId>
    <version>1.0</version>

    <!--配置專案所依賴的jar包-->
    <dependencies>
	
     
    </dependencies>

</project>

配置jar包依賴,在maven依賴查詢網址中查到對應的配置項後,複製貼上到標籤中

四、awk命令

簡介

awk其名稱得自於它的創始人 Alfred Aho 、Peter Weinberger 和 Brian Kernighan 姓氏的首個字母。實際上 AWK 的確擁有自己的語言: AWK 程式設計語言 , 三位建立者已將它正式定義為“樣式掃描和處理語言”。它允許您建立簡短的程式,這些程式讀取輸入檔案、為資料排序、處理資料、對輸入執行計算以及生成報表,還有無數其他的功能。

awk基本命令格式

資料來源|awk -F “” ‘BEGIN{} {}… END{}’
awk -F “” ‘BEGIN{} {}… END{}’ 資料來源
awk -F “” ‘{}… END{}’ 資料來源
awk -F “” '{}… ’ 資料來源

BEGIN{}類似於MapReduce中的setup()函式,在{}之前執行且僅執行一次

END{}類似於MapReduce中的clean()函式,在{}之後執行且僅執行一次

awk案例
  1. 過濾(獲得使用者中使用者組id大於500的資訊)

    cat /etc/passwd | awk -F “:” ‘{if($3>500)print $1,$2,$3}’

  2. 分段統計、百分比

    資料

    a 300 200 300
    b 800 900 200
    c 500 800 900
    d 900 900 300

    命令

    cat ./pc | awk -F “\t” ’
    BEGIN{
    ge0lt1000=0
    ge1000lt1500=0
    ge1500lt2000=0
    ge2000=0
    print “zone\tcount\tpercent”
    }

    {
    if(($2+$3+$4) < 1000){
    ge0lt1000++
    } else if(($2+$3+$4) < 1500){
    ge1000lt1500++
    } else if(($2+$3+$4) < 2000){
    ge1500lt2000++
    } else {
    ge2000++
    }
    }

    END{
    print “ge0lt1000”,ge0lt1000,ge0lt1000/NR100.0"%"
    print “ge1000lt1500”,ge1000lt1500,ge1000lt1500/NR
    100.0"%"
    print “ge1500lt2000”,ge1500lt2000,ge1500lt2000/NR100.0"%"
    print “ge2000”,ge2000,ge2000/NR
    100.0"%"
    }

    結果

    zone count percent
    ge0lt1000 1 25%
    ge1000lt1500 0 0%
    ge1500lt2000 1 25%
    ge2000 2 50%

五、簡單的MapReduce案例

解析json字串

需求:

輸入資料:
{"add":"183.160.122.237|20171030080000015","ods":{"hed":"20171030075958865|011001103233576|60427fbe7d66,60427f8b733b|NewTV02SkyworthNews|192.168.1.6|4|6|60000185|3","dt":"20171030075958865|4|7,3404304,19458158,0,1,60000,0,2430|"}}

輸出:
183.160.122.237|20171030080000015	20171030075958865|011001103233576|60427fbe7d66,60427f8b733b|NewTV02SkyworthNews|192.168.1.6|4|6|60000185|3	20171030075958865|4|7,3404304,19458158,0,1,60000,0,2430|

程式碼:

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Map;

public class JSONanalysis{
    public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
        public static Text k = new Text();
        public static FloatWritable v = new FloatWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            Map<String, Object> map = JSONObject.parseObject(value.toString(), Map.class);
            Map<String, String> ods = JSONObject.parseObject(map.get("ods").toString(), Map.class);

            String result = map.get("add") + "\t" + ods.get("hed") + "\t" + ods.get("dt");
            context.write(new Text(result), NullWritable.get());

        }

    }
    public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable>{


        /*@Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, null);
        }*/
    }
    // 驅動
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取配置物件資訊
        Configuration conf = new Configuration();
        // 2.對conf進行設定(沒有就不用)
        conf.set("fs.defaultFS", "hdfs://sz01:8020/");
        // 3.獲取job物件
        Job job = Job.getInstance(conf, "jsonanalysis");
        // 4.設定job的執行主類
        job.setJarByClass(JSONanalysis.class);

        // 5.對map階段進行設定
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        setArgs(job, args);
        // 7.提交job並列印資訊
        int isok = job.waitForCompletion(true) ? 0 : 1;

        // 退出整個job
        System.exit(isok);
    }
    /**
     * 作業引數處理
     * @param job
     * @param args
     */
    public static void setArgs(Job job , String[] args){
        try {
            if(args.length != 2){
                System.out.println("argments size is not enough!!!");
                System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
            }
            //設定輸入檔案路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //判斷輸出目錄是否存在
            FileSystem fs = FileSystem.get(job.getConfiguration());
            System.out.println(fs);
            Path op = new Path(args[1]);
            if(fs.exists(op)){
                fs.delete(op, true);
            }
            //設定輸出資料目錄
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
多檔案輸出

需求

資料:
hello qianfeng qianfeng world heloo
Hello Hi Hello World
QF QQ
163.com
15900001111 17900001111
@163.com
@189.com
$1100000
*[a-z]
輸出:
part-r-00000
hello	1
heloo	1
qianfeng	2
world	1

part-r-00001
Hello	2
Hi	1
QF	1
QQ	1

part-r-00002
163.com 1
15900001111 1
17900001111 1

part-r-00003
@163.com 1
@189.com 1
$1100000 1
*[a-z] 1

程式碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Partition {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" ");
            for (String s : words){
                context.write(new Text(s), new Text(1 + ""));
            }
        }
    }

    public static class MyReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for(Text t : values){
                count += Integer.parseInt(t.toString());
            }
            context.write(key, new Text(count + ""));
        }
    }

    public static class MyPartitioner extends Partitioner<Text, Text>{
        @Override
        public int getPartition(Text key, Text value, int numPartitions) {
            String firstChar = key.toString().substring(0, 1);
            if(firstChar.matches("^[a-z]")) {
                return 0;
            }else if(firstChar.matches("^[A-Z]")) {
                return 1;
            }else if(firstChar.matches("^[0-9]")) {
                return 2;
            }else {
                return 3;
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取配置物件資訊
        Configuration conf = new Configuration();
        // 2.對conf進行設定(沒有就不用)
        conf.set("fs.defaultFS", "hdfs://sz01:8020/");
        // 3.獲取job物件
        Job job = Job.getInstance(conf, "jsonanalysis");
        // 4.設定job的執行主類
        job.setJarByClass(Partition.class);

        // 5.對map階段進行設定
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 設定分割槽
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(4);

        // 6.對reduce階段設定
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        setArgs(job, args);
        // 7.提交job並列印資訊
        int isok = job.waitForCompletion(true) ? 0 : 1;

        // 退出整個job
        System.exit(isok);
    }
    /**
     * 作業引數處理
     * @param job
     * @param args
     */
    public static void setArgs(Job job , String[] args){
        try {
            if(args.length != 2){
                System.out.println("argments size is not enough!!!");
                System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
            }
            //設定輸入檔案路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //判斷輸出目錄是否存在
            FileSystem fs = FileSystem.get(job.getConfiguration());
            Path op = new Path(args[1]);
            if(fs.exists(op)){
                fs.delete(op, true);
            }
            //設定輸出資料目錄
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
倒序索引

需求:

資料:
index.html
hadoop hadoop hadoop is nice is good hadoop hadoop
hadoop-info.html
hadoop hadoop hadoop is better
spark-info.html
spark spark spark hadoop is nice nice nice

輸出資料:
better hadoop-info.html:1
good hadoop-info.html:1
hadoop index.html:5;hadoop-info.html:3;spark-info.html:1
....
spark spark-info.html:3

程式碼:

DescIndex.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class DescIndex {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            InputSplit is = context.getInputSplit();
            String fileName = ((FileSplit)is).getPath().getName();

            String line = value.toString();
            String[] words = line.split(" ");
            for (String s : words) {
                context.write(new Text(s + "_" + fileName), new Text(1 + ""));
            }
        }
    }

    public static class MyReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {


            String str = "";
            for (Text t : values) {

                str += t.toString() + ";";
            }
            context.write(key, new Text(str.substring(0, str.length() - 1)));
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取配置物件資訊
        Configuration conf = new Configuration();
        // 2.對conf進行設定(沒有就不用)
        conf.set("fs.defaultFS", "hdfs://sz01:8020/");
        // 3.獲取job物件
        Job job = Job.getInstance(conf, "descindex");
        // 4.設定job的執行主類
        job.setJarByClass(DescIndex.class);

        // 5.對map階段進行設定
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 設定combiner
        job.setCombinerClass(MyCombiner.class);

        // 6.對reduce階段設定
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);


        setArgs(job, args);
        // 7.提交job並列印資訊
        int isok = job.waitForCompletion(true) ? 0 : 1;

        // 退出整個job
        System.exit(isok);
    }
    /**
     * 作業引數處理
     * @param job
     * @param args
     */
    public static void setArgs(Job job , String[] args){
        try {
            if(args.length != 2){
                System.out.println("argments size is not enough!!!");
                System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
            }
            //設定輸入檔案路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //判斷輸出目錄是否存在
            FileSystem fs = FileSystem.get(job.getConfiguration());
            Path op = new Path(args[1]);
            if(fs.exists(op)){
                fs.delete(op, true);
            }
            //設定輸出資料目錄
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MyCombiner.java:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

public class MyCombiner extends Reducer<Text, Text, Text, Text> {

    Map<Text, Integer> reduceMap = new HashMap<>();
    ValueComparator bvc = new ValueComparator(reduceMap);
    TreeMap<Text, Integer> sorted_map = new TreeMap<>(bvc);


    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String[] str = key.toString().split("_");
        int counter = 0;
        for (Text t : values) {
            counter += Integer.parseInt(t.toString());
        }
        //context.write(new Text(str[0]), new Text(str[1] + ":" + counter));
        sorted_map.put(new Text(str[0] + "_" + str[1]), counter);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println(reduceMap);
        for(Text in : reduceMap.keySet()){
            context.write(new Text(in.toString().split("_")[0]), new Text(in.toString().split("_")[1] + reduceMap.get(in)));
        }
    }
}
去重

需求:

2017-11-28	北京-天津
2017-11-29	北京-天津
2017-11-27	北京-天津
2017-11-27	北京-天津
2017-11-28	北京-天津
2017-11-26	北京-天津
2017-11-26	北京-哈爾濱
2017-11-29	北京-天津
2017-11-26	北京-三亞

輸出:
2017-11-28	北京-天津
2017-11-29	北京-天津
2017-11-27	北京-天津
2017-11-26	北京-天津
2017-11-26	北京-哈爾濱
2017-11-26	北京-三亞

程式碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 去重
 */
public class Distinct {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            context.write(new Text(line),NullWritable.get());
        }

    }
    public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {


        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    // 驅動
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.獲取配置物件資訊
        Configuration conf = new Configuration();
        // 2.對conf進行設定(沒有就不用)
        conf.set("fs.defaultFS", "hdfs://sz01:8020/");
        // 3.獲取job物件
        Job job = Job.getInstance(conf, "distinct");
        // 4.設定job的執行主類
        job.setJarByClass(Distinct.class);

        // 5.對map階段進行設定
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 6.對reduce階段設定
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        setArgs(job, args);
        // 7.提交job並列印資訊
        int isok = job.waitForCompletion(true) ? 0 : 1;

        // 退出整個job
        System.exit(isok);
    }
    /**
     * 作業引數處理
     * @param job
     * @param args
     */
    public static void setArgs(Job job , String[] args){
        try {
            if(args.length != 2){
                System.out.println("argments size is not enough!!!");
                System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
            }
            //設定輸入檔案路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));
            //判斷輸出目錄是否存在
            FileSystem fs = FileSystem.get(job.getConfiguration());
            System.out.println(fs);
            Path op = new Path(args[1]);
            if(fs.exists(op)){
                fs.delete(op, true);
            }
            //設定輸出資料目錄
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}