opencv影象的混合操作
MapReduce
hdfs用於儲存海量資料,mapreduce則用於處理資料,是一種分散式計算模型。MapReduce的思想:將任務切割為多個小任務進行平行計算(Map),然後將得到的區域性結果進行彙總(Reduce)。
網路io的耗時遠大於磁碟io。當計算程式和資料分別在不同機器上時,將計算程式移動到資料所在節點比移動資料要快的多。所以Hadoop中的MapReduce就是將計算程式傳送到各個DataNode上,每個資料節點分別計算自己所儲存的資料,得到結果後彙總。
Block是對檔案的物理切分,split是對檔案的邏輯切分,每個split生成一個MapTask。每個ReduceTask生成一個結果,最後彙總。Map階段對應Mapper類,Reduce階段對應Reducer類。
Map 階段
當省略Reduce階段,即只是對資料進行過濾時,只要執行到第二階段即可,然後將資料寫入本地磁碟。
- 框架會將輸入檔案(夾)劃分為很多InputSplit,預設每個HDFS的Block對應一個InputSplit。通過RecordReader類,把每個InputSplit解析為一個個<k1, v1>。預設每行資料被拆解為一個<k1, v1>
- 框架呼叫Mapper類中的 map(...) 函式,map函式的輸入是<k1, v1>,輸出是<k2, v2>。一個InputSplit對應一個 Map Task
- 框架對map函式輸出的<k2, v2>進行分割槽。不同分割槽中<k2, v2>由不同的ReduceTask處理,預設只有1個分割槽
- 框架對每個分割槽中的資料,按照k2進行排序、分組。分組指的是相同k2的v2分成一組
- 在Map階段,框架可以執行Combiner操作(可選,預設不執行)。進行區域性聚合,類似reduce的操作,可以減小後期傳輸資料的大小。如求平均值等操作不能使用,防止影響結果
- 框架會把Map Task輸出的<k2, v2>寫入到Linux的磁碟檔案
例:
對下面檔案進行詞頻統計:
hello you
hello me
-
拆分任務
<0, hello you> <10, hello me> # 10 是偏移長度
-
呼叫map函式
# MapTask1 <hello, 1> <you, 1> # MapTask2 <hello, 1> <me, 1>
-
聚集
<hello, 1> <you, 1> <hello, 1> <me, 1>
-
排序
<hello, 1> <hello, 1> <me, 1> <you, 1>
分組
<hello, {1, 1}> <me, {1}> <you, {1}>
- Combiner 規約, 預設不執行
<hello, {2}> <me, {1}> <you, {1}>
- 寫入磁碟
Reduce 階段
有時可以省略
- 框架對多個MapTask的輸出,按照不同的分割槽,通過網路複製到不同的Reduce節點,這個過程稱作Shuffle
- 框架對Reduce節點接收到的相同分割槽的<k2, v2>資料進行合併、排序、分組
前一階段的排序分組是一個MapTask內,此處是MapTask之間 - 框架呼叫Reduce類中的reduce方法,輸入<k2, {v2...}>輸出<k3, v3>。一個<k2, {v2...}>呼叫一次reduce函式
- 框架把Reduce的輸出結果儲存到HDFS中
例子:WordCount
單檔案詞頻統計
兩個檔案詞頻統計
程式碼
打包成jar包然後上傳到叢集中。使用hadoop jar bigdata-1.0-SNAPSHOT.jar /hello.txt /out1
執行程式碼。格式為hadoop/yarn jar xxx.jar [指定main方法所在類] [程式的args引數]
。
package org.example.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 處理檔案 hello.txt ,內容如下:
* hello you
* hello me
*
* 結果:
* hello 2
* me 1
* you 1
*/
public class WordCountJob {
/**
* 組裝一個 Job 用於啟動任務。
* Job = Mapper + Reducer
*/
public static void main(String[] args) {
try {
if (args.length != 2) {
System.exit(1);
}
// 建立一個配置類
Configuration conf = new Configuration();
// 建立一個任務
Job job = Job.getInstance(conf);
// 注意:這一行必須設定,否則在叢集中執行時找不到 WordCountJob 這個類
job.setJarByClass(WordCountJob.class);
// 指定輸入路徑,可以是檔案或目錄。如果是目錄則讀取其下所有檔案
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定一個輸出目錄,必須是不存在的目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 指定 mapper 類和輸出的 key value
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定 Reducer 類和最終輸出的 key value
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 提交 job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* map 流程,重寫map函式
* 繼承類 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 需要實現 map 函式
* 這個 map 函式可以接收 <k1, v1> 產生 <k2, v2>
* 此處對每一行資料進行切割,把單詞切割出來
* @param key 每一行資料行首偏移量
* @param value 每一行內容
*/
@Override
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context
) throws IOException, InterruptedException {
// 分割單詞
String[] words = value.toString().split(" ");
// 迭代分出的單詞
for (String word : words) {
// 把迭代出的單詞封裝為 <k2, v2> 的形式
Text keyOut = new Text(word);
LongWritable valueOut = new LongWritable(1L);
// 把 <k2, v2> 寫出去
context.write(keyOut, valueOut);
}
}
}
/**
* Reduce 階段
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* 針對 <k2, {v2...}> 進行累加求和,並把資料轉為 <k3, v3>
*/
@Override
protected void reduce(
Text key,
Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context
) throws IOException, InterruptedException {
// 將詞頻累加
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
// 把結果寫出去
context.write(key, new LongWritable(sum));
}
}
}
當只需要進行解析時,MapReduce程式中可以沒有Reduce的步驟。在main函式中無需再設定reducer類,但是要輸入 job.setNumReduceTasks(0);
。輸出結果檔案為 part-m-00000
依賴:
以下都不是必需的
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
<!-- 只在編譯執行時需要,hadoop中有這個依賴,無需打包-->
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- maven 打包,指定啟動類。也可在啟動時手動指定 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<archive>
<manifest>
<mainClass>org.example.mapreduce.WordCountJob</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 指定jdk版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!-- 打包時將依賴也全部打包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!--打包時,包含所有依賴的jar包-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--生成javadoc檔案-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<encoding>UTF-8</encoding>
<charset>UTF-8</charset>
<docencoding>UTF-8</docencoding>
<doclint>none</doclint>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!--生成source檔案-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
以下提示資訊表示執行成功
在指定的輸出目錄中可見:
_SUCCESS 用於標識成功。part-r-00000 是成功的結果檔案,由reduce程式產生。如果沒有reduce程式則生成 part-m-00000檔案;當有多個reduce任務時會生成多個part檔案,結尾為00001、00002等
MapReduce任務日誌查
在程式中使用 sout 輸出資訊無法在控制檯直接看見,因為控制檯只是一個客戶端,不能看見服務的日誌。
通過訪問hdfs的web控制介面 http://NameNodeIp:8088
點選 History 連結。
檢視前要做兩個配置然後啟動相應的 historyserver 程序
- 在hosts檔案中配置主機名與對應ip
- 開啟日誌聚合功能,把散落在NodeManager節點上的日誌同一收集管理,方便檢視
修改所有叢集配置,在 yarn-site.xml 中新增 yarn.log-aggregation-enable 和 yarn.log.server.url 兩個引數<property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <property> <name>yarn.log.server.url</name> <value>http://bigdata01:19888/jobhistory/logs/</value> </property>
- 啟動:
mapred --daemon start historyserver
所有機器執行此命令啟動 historyserver 程序,然後進入web介面檢視log
使用命令列檢視日誌:yarn logs -applicationId <Application ID>
停止正在執行的任務
yarn application -kill <ApplicationId>
在控制檯按 ctrl+c 無法停止程式,控制檯執行的是客戶端程式,MapReduce程式已提交到叢集中執行了。