MapReduce 順序組合, 迭代式,組合式,鏈式
1、順序組合式
順序組合式就是按照指定順序執行任務如:mapreduce1 --> mapreduce2 --> mapreduce3
即:mapreduce1的輸出是mapreduce2的輸入,mapreduce2的輸出式mapreduce3的輸入
程式碼片段如下:
Java程式碼- String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";
- String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";
-
String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/"
- String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";
- // job1配置
- Job job1 = Job.getInstance(conf);
- job1.setJarByClass(Mode.class);
- job1.setMapperClass(Map1.class);
- job1.setReducerClass(Reduce1.class);
-
job1.setMapOutputKeyClass(Text.class
- job1.setMapOutputValueClass(IntWritable.class);
- job1.setOutputKeyClass(Text.class);
- job1.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job1, new Path(inPath1));
-
FileOutputFormat.setOutputPath(job1, new Path(outPath1));
- job1.waitForCompletion(true);
- // job2配置
- Job job2 = Job.getInstance(conf);
- job2.setJarByClass(Mode.class);
- job2.setMapperClass(Map2.class);
- job2.setReducerClass(Reduce2.class);
- job2.setMapOutputKeyClass(Text.class);
- job2.setMapOutputValueClass(IntWritable.class);
- job2.setOutputKeyClass(Text.class);
- job2.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job2, new Path(inPath1));
- FileOutputFormat.setOutputPath(job2, new Path(outPath2));
- job2.waitForCompletion(true);
- // job3配置
- Job job3 = Job.getInstance(conf);
- job3.setJarByClass(Mode.class);
- job3.setMapperClass(Map3.class);
- job3.setReducerClass(Reduce3.class);
- job3.setMapOutputKeyClass(Text.class);
- job3.setMapOutputValueClass(IntWritable.class);
- job3.setOutputKeyClass(Text.class);
- job3.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job3, new Path(outPath2));
- FileOutputFormat.setOutputPath(job3, new Path(outPath3));
- job3.waitForCompletion(true);
子任務作業配置程式碼執行後,將按照順序逐個執行每個子任務作業。由於後一個子任務需要使用前一個子任務的輸出資料,因此,每一個子任務
都需要等前一個子任務執行執行完畢後才允許執行,這是通過job.waitForCompletion(true)方法加以保證的。
2、迭代組合式
迭代也可以理解為for迴圈或while迴圈,當滿足某些條件時,迴圈結束
mapreduce的迭代演算法正在研究中,後續提供完整原始碼....
程式碼如下:
3、複雜的依賴組合式
處理複雜的要求的時候,有時候一個mapreduce程式完成不了,往往需要多個mapreduce程式 這個時候就牽扯到各個任務之間的依賴關係,
所謂依賴就是一個M/R job的處理結果是另外一個M/R的輸入,以此類推,
這裡的順序是 job1 和 job2 單獨執行, job3依賴job1和job2執行後的結果
程式碼如下:
- package com.hadoop.mapreduce;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class Mode {
- // 第一個Job
- public static class Map1 extends Mapper<Object, Text, Text, IntWritable>{
- Text word = new Text();
- @Override
- protected void map(Object key, Text value,Context context)
- throws IOException, InterruptedException {
- StringTokenizer st = new StringTokenizer(value.toString());
- while(st.hasMoreTokens()){
- word.set(st.nextToken());
- context.write(word, new IntWritable(1));
- }
- }
- }
- public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{
- IntWritable result = new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context)
- throws IOException, InterruptedException {
- int sum = 0;
- for(IntWritable val : values){
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- // 第二個Job
- public static class Map2 extends Mapper<Object, Text, Text, IntWritable>{
- Text word = new Text();
- @Override
- protected void map(Object key, Text value,Context context)
- throws IOException, InterruptedException {
- StringTokenizer st = new StringTokenizer(value.toString());
- while(st.hasMoreTokens()){
- word.set(st.nextToken());
- context.write(word, new IntWritable(1));
- }
- }
- }
- public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{
- IntWritable result = new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context)
- throws IOException, InterruptedException {
- int sum = 0;
- for(IntWritable val : values){
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- // 第三個Job
- public static class Map3 extends Mapper<Object, Text, Text, IntWritable>{
- Text word = new Text();
- @Override
- protected void map(Object key, Text value,Context context)
- throws IOException, InterruptedException {
- StringTokenizer st = new StringTokenizer(value.toString());
- while(st.hasMoreTokens()){
- word.set(st.nextToken());
- context.write(word, new IntWritable(1));
- }
- }
- }
- public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{
- IntWritable result = new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values,Context context)
- throws IOException, InterruptedException {
- int sum = 0;
- for(IntWritable val : values){
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- public static void main(String[] args) throws IOException{
- String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";
- String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";
- String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/";
- String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";
- String[] inOut = {inPath1, outPath1};
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs();
- if (otherArgs.length < 2) {
- System.err.println("Usage: wordcount <in> [<in>...] <out>");
- System.exit(2);
- }
- // 判斷輸出路徑是否存在,如存在先刪除
- FileSystem hdfs = FileSystem.get(conf);
- Path findFile = new Path(outPath1);
- boolean isExists = hdfs.exists(findFile);
- if(isExists){
- hdfs.delete(findFile, true);
- }
- if(hdfs.exists(new Path(outPath2))){
- hdfs.delete(new Path(outPath2), true);
-
相關推薦
斐波那契數列的遞迴,迭代(迴圈),通項公式三種實現
謂Fibonacci數列是指這樣一種數列,它的前兩項均為1,從第三項開始各項均為前兩項之和。用數學公式表示出來就是: 1 (n=1,2)fib(n)= fib(n-1)+fib(n-
MapReduce 順序組合, 迭代式,組合式,鏈式
1、順序組合式 順序組合式就是按照指定順序執行任務如:mapreduce1 --> mapreduce2 --> mapreduce3 即:mapreduce1的輸出是mapreduce2的輸入,mapreduce2的輸出式mapreduce3
Python基礎(8):python中的特性進階篇(迭代,列表生成式,生成器,迭代器)
python中還包括一些高階特性,以下簡單介紹。 迭代 定義:用for迴圈來遍歷物件的過程,叫做迭代。 作用物件:可迭代物件 如何判斷是否為可迭代物件:isinstance(xxx,Iterable),Iterable型別來源於collections模組。 應用場景: 1
python 可迭代序列(列表,元組,字串),實現鄰近去重,順序不變
碼字不易,轉載請標明出處… 鄰近去重程式碼實現如下: def special_func_order(seq): list_ = [] # 定義一個空列表,用來儲存判斷後的資料 for i in range(len(seq) - 1): # 假如 le
模版方法模式,迭代器模式,組合模式,狀態模式,代理模式
1.模版方法模式:在一個方法中定義一個演算法的骨架,而將一些步驟延遲到子類中,模版方法使得子類可以在不改變演算法結構的情況下,重新定義演算法中的某些步驟,還可以提供hook()讓子類決定是否執行某些步驟。比如sort中的Comparable介面。 2.迭代器模式就是集合的迭代器 3.組合模式
推導式,迭代器,模組,包
推導表示式 推導表示式相對於for迴圈來處理資料,要更加的方便 列表推導表示式使用更加的廣泛 列表推導 迴圈新增: li = [ ] for i in range(1,11): #左閉右開 li.append(i) #[1,2,3,4,5,6,7,8,
python的切片,迭代,列表生成式
python的程式碼需要有優雅,明確,簡單的特性。代表著需要程式碼越少越好,越簡單越好。為此,python提供了許多高階特性,如切片,迭代,列表生成式等等,可以有效的用簡單程式碼實現複雜功能。 1,切片 適用型別:list, tuple, str 切片功能簡單說就是實現擷取,不管是列表list,不可變列
Python五種迭代方式 for迴圈,列表推導式,內建函式map(),生成器推導式,生成器函式 速度對比
對比了Python3的五種迭代方式進行函式簡單計算的花費時間 五種迭代分別是,for迴圈,列表推導式,內建函式map(),生成器推導式,生成器函式 簡單計算以add()加10操作和abs()絕對值舉例
python列表生成式,生成器,迭代器
列表生成式:也叫推導,使程式碼更簡潔1.列表推導式: a= [x for x in range(100) if x % 3 == 0]2.字典推導式 #快速更換key和value 字典推導和列表推導的使用方法是類似的,中括號該改成大括號。 y =
微信約戰炸金花棋牌平臺出租Java普通代碼塊,構造代碼塊,靜態代碼塊區別,執行順序的代碼實例
屬性 java 對象 ... 沒有 每次 class string eat 除了說微信約戰炸金花棋牌平臺出租( h5.super-mans.com Q:2012035031)普通代碼塊,靜態代碼塊,構造代碼塊的執行順序外,還有靜態方法,靜態變量等,都放在一起的話,這個
用程式碼來解釋可迭代性,迭代器,生成器的區別
一. 創造器(creator) 這是我自己造的一個名詞,因為在python術語中,對只實現了__next__()方法的物件,好像沒有任何名分,為了說明,我將只實現了__next__()方法的物件稱為創造器(creator)。 class O_Next: def __init__(se
day011 函式名的運用,閉包,迭代器
主要內容: 1.函式名的使用以及第一類物件 2.閉包 3.迭代器一、函式名的運用 函式名就是變數名,命名規則與變數名一樣。 函式名儲存的是函式的記憶體地址。 1、檢視函式名的記憶體地址 """python def inf(): print("疏影"
C++ 使用Vector容器查詢,迭代,插入,去重 用法總結
返回最後一個元素: return v.back(); 迭代器: for (std::vector<int>::iterator it = v.begin(); it != v.end(); it++) {
遞迴,迭代和遍歷
遞迴 如果一個函式在內部呼叫自身本身,這個函式就是遞迴函式。 條件:必須要有收斂條件和遞迴公式。 特性:1.必須有一個明確的結束條件。 2.每次進入更深一層遞迴時,問題規模相比賞析遞迴都應有所減少。 3.遞迴效率不高,遞迴層次過多會導致棧溢位(遞迴最大999層)。
Python生成器,迭代器,可迭代物件
在瞭解Python的資料結構時,容器(container)、可迭代物件(iterable)、迭代器(iterator)、生成器(generator)、列表/集合/字典推導式(list,set,dic
python3.5進階(三)-------------實現多工之協程(生成器,迭代器)
1.迭代器:迭代是訪問集合元素的一種方式,迭代器是可以記住遍歷的位置的物件,迭代器物件從集合的第一個元素開始訪問,直到所有訪問結束,迭代器只能前進不能後退。判斷一個數據型別是否可以迭代,看是否能for迴圈。如(字串,列表,元祖...)序列可以迭代,數字不能迭代,或通過isintance([11,12
vector容器,迭代器,空間介面卡三個類方法的實現
C++的STL庫有一個容器叫vector,這個容器底層的資料結構是一個記憶體可以自動增長的陣列,每次當陣列儲存滿了以後,記憶體可以自動增加兩倍,請完成vector容器、迭代器和空間配置器三個類方法的實現。 #include<iostream> using namespace
空間配置器,迭代器,容器,介面卡
一、空間配置器 下面先總體介紹一下空間配置器。空間配置器的作用是在底層為上層的各種容器提供儲存空間,需要多少分配多少,一般分配的比你需要的更多。打個不恰當的比喻,空間配置器和容器之間的關係,相當於將軍和糧草的關係。當然了,容器相當於將軍,它在陣前殺敵,衝鋒陷陣,處理各種事情;而空間配置器
生成器,迭代器和裝飾器
1.生成器 解析器在實時生成資料,資料不會駐留在記憶體中。因此,其執行效率很高! yield 是一個類似 return 的關鍵字,只是這個函式返回的是個生成器 當你呼叫這個函式的時候,函式內部的程式碼並不立即執行 ,這個函式只是返回一個生成器物件 當你使用for進行迭代的時候,函式中的程
動態匯入模組,載入預訓練模型,nn.Sequential函式裡面必須是a Module subclass,不能是一個列表或者是其他的迭代器、生成器,雖然這裡麵包含了Module的子類
class RES(nn.Module): def __init__(self): super(RES, self).__init__() self.conv1=nn.Conv2d(3,64,kernel_size=7,stride=2,pa