1. 程式人生 > 其它 >opencv影象的混合操作

opencv影象的混合操作

MapReduce

hdfs用於儲存海量資料,mapreduce則用於處理資料,是一種分散式計算模型。MapReduce的思想:將任務切割為多個小任務進行平行計算(Map),然後將得到的區域性結果進行彙總(Reduce)。

網路io的耗時遠大於磁碟io。當計算程式和資料分別在不同機器上時,將計算程式移動到資料所在節點比移動資料要快的多。所以Hadoop中的MapReduce就是將計算程式傳送到各個DataNode上,每個資料節點分別計算自己所儲存的資料,得到結果後彙總。

Block是對檔案的物理切分,split是對檔案的邏輯切分,每個split生成一個MapTask。每個ReduceTask生成一個結果,最後彙總。Map階段對應Mapper類,Reduce階段對應Reducer類。

Map 階段

當省略Reduce階段,即只是對資料進行過濾時,只要執行到第二階段即可,然後將資料寫入本地磁碟。

  1. 框架會將輸入檔案(夾)劃分為很多InputSplit,預設每個HDFS的Block對應一個InputSplit。通過RecordReader類,把每個InputSplit解析為一個個<k1, v1>。預設每行資料被拆解為一個<k1, v1>
  2. 框架呼叫Mapper類中的 map(...) 函式,map函式的輸入是<k1, v1>,輸出是<k2, v2>。一個InputSplit對應一個 Map Task
  3. 框架對map函式輸出的<k2, v2>進行分割槽。不同分割槽中<k2, v2>由不同的ReduceTask處理,預設只有1個分割槽
  4. 框架對每個分割槽中的資料,按照k2進行排序、分組。分組指的是相同k2的v2分成一組
  5. 在Map階段,框架可以執行Combiner操作(可選,預設不執行)。進行區域性聚合,類似reduce的操作,可以減小後期傳輸資料的大小。如求平均值等操作不能使用,防止影響結果
  6. 框架會把Map Task輸出的<k2, v2>寫入到Linux的磁碟檔案

例:

對下面檔案進行詞頻統計:

hello you
hello me
  1. 拆分任務

    <0, hello you>
    <10, hello me>     # 10 是偏移長度
  2. 呼叫map函式

    # MapTask1
    <hello, 1>
    <you, 1>
    
    # MapTask2
    <hello, 1>
    <me, 1>
  3. 聚集

    <hello, 1>
    <you, 1>
    <hello, 1>
    <me, 1>
  4. 排序

    <hello, 1>
    <hello, 1>
    <me, 1>
    <you, 1>

    分組

    <hello, {1, 1}>
    <me, {1}>
    <you, {1}>
  5.  Combiner 規約, 預設不執行
    <hello, {2}>
    <me, {1}>
    <you, {1}>
  6. 寫入磁碟

Reduce 階段

有時可以省略

  1. 框架對多個MapTask的輸出,按照不同的分割槽,通過網路複製到不同的Reduce節點,這個過程稱作Shuffle
  2. 框架對Reduce節點接收到的相同分割槽的<k2, v2>資料進行合併、排序、分組
    前一階段的排序分組是一個MapTask內,此處是MapTask之間
  3. 框架呼叫Reduce類中的reduce方法,輸入<k2, {v2...}>輸出<k3, v3>。一個<k2, {v2...}>呼叫一次reduce函式
  4. 框架把Reduce的輸出結果儲存到HDFS中

例子:WordCount

單檔案詞頻統計

兩個檔案詞頻統計

程式碼

打包成jar包然後上傳到叢集中。使用hadoop jar bigdata-1.0-SNAPSHOT.jar  /hello.txt /out1執行程式碼。格式為hadoop/yarn jar xxx.jar [指定main方法所在類] [程式的args引數]

package org.example.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 處理檔案 hello.txt ,內容如下:
 * hello you
 * hello me
 *
 * 結果:
 * hello    2
 * me   1
 * you  1
 */
public class WordCountJob {

    /**
     * 組裝一個 Job 用於啟動任務。
     * Job = Mapper + Reducer
     */
    public static void main(String[] args) {
        try {
            if (args.length != 2) {
                System.exit(1);
            }

            // 建立一個配置類
            Configuration conf = new Configuration();
            // 建立一個任務
            Job job = Job.getInstance(conf);

            // 注意:這一行必須設定,否則在叢集中執行時找不到 WordCountJob 這個類
            job.setJarByClass(WordCountJob.class);

            // 指定輸入路徑,可以是檔案或目錄。如果是目錄則讀取其下所有檔案
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 指定一個輸出目錄,必須是不存在的目錄
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 指定 mapper 類和輸出的 key value
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            // 指定 Reducer 類和最終輸出的 key value
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);

            // 提交 job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * map 流程,重寫map函式
     * 繼承類 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     *
     */
    public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        /**
         * 需要實現 map 函式
         * 這個 map 函式可以接收 <k1, v1> 產生 <k2, v2>
         * 此處對每一行資料進行切割,把單詞切割出來
         * @param key 每一行資料行首偏移量
         * @param value 每一行內容
         */
        @Override
        protected void map(LongWritable key,
                           Text value,
                           Mapper<LongWritable, Text, Text, LongWritable>.Context context
        ) throws IOException, InterruptedException {
            // 分割單詞
            String[] words = value.toString().split(" ");
            // 迭代分出的單詞
            for (String word : words) {
                // 把迭代出的單詞封裝為 <k2, v2> 的形式
                Text keyOut = new Text(word);
                LongWritable valueOut = new LongWritable(1L);
                // 把 <k2, v2> 寫出去
                context.write(keyOut, valueOut);
            }
        }
    }

    /**
     * Reduce 階段
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        /**
         * 針對 <k2, {v2...}> 進行累加求和,並把資料轉為 <k3, v3>
         */
        @Override
        protected void reduce(
                Text key,
                Iterable<LongWritable> values,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context
        ) throws IOException, InterruptedException {
            // 將詞頻累加
            long sum = 0L;
            for (LongWritable value : values) {
                sum += value.get();
            }
            // 把結果寫出去
            context.write(key, new LongWritable(sum));
        }
    }
}

當只需要進行解析時,MapReduce程式中可以沒有Reduce的步驟。在main函式中無需再設定reducer類,但是要輸入 job.setNumReduceTasks(0);。輸出結果檔案為 part-m-00000

依賴:

以下都不是必需的

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.2</version>
            <!-- 只在編譯執行時需要,hadoop中有這個依賴,無需打包-->
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
<!--            maven 打包,指定啟動類。也可在啟動時手動指定 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.example.mapreduce.WordCountJob</mainClass>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            
            <!-- 指定jdk版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <!-- 打包時將依賴也全部打包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <!--打包時,包含所有依賴的jar包-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
            <!--生成javadoc檔案-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-javadoc-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <charset>UTF-8</charset>
                    <docencoding>UTF-8</docencoding>
                    <doclint>none</doclint>
                </configuration>
                <executions>
                    <execution>
                        <id>attach-javadocs</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--生成source檔案-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.4</version>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

以下提示資訊表示執行成功

在指定的輸出目錄中可見:

_SUCCESS 用於標識成功。part-r-00000 是成功的結果檔案,由reduce程式產生。如果沒有reduce程式則生成 part-m-00000檔案;當有多個reduce任務時會生成多個part檔案,結尾為00001、00002等

MapReduce任務日誌查

在程式中使用 sout 輸出資訊無法在控制檯直接看見,因為控制檯只是一個客戶端,不能看見服務的日誌。

通過訪問hdfs的web控制介面 http://NameNodeIp:8088點選 History 連結。

檢視前要做兩個配置然後啟動相應的 historyserver 程序

  1. 在hosts檔案中配置主機名與對應ip
  2. 開啟日誌聚合功能,把散落在NodeManager節點上的日誌同一收集管理,方便檢視
    修改所有叢集配置,在 yarn-site.xml 中新增 yarn.log-aggregation-enable 和 yarn.log.server.url 兩個引數
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property> 
    <property>
        <name>yarn.log.server.url</name>
        <value>http://bigdata01:19888/jobhistory/logs/</value>
    </property>
  3. 啟動:
    mapred --daemon start historyserver所有機器執行此命令啟動 historyserver 程序,然後進入web介面檢視log






使用命令列檢視日誌:yarn logs -applicationId <Application ID>

停止正在執行的任務

yarn application -kill <ApplicationId>

在控制檯按 ctrl+c 無法停止程式,控制檯執行的是客戶端程式,MapReduce程式已提交到叢集中執行了。