1. 程式人生 > 實用技巧 >HBase 與 MapReduce 整合

HBase 與 MapReduce 整合

HBase 與 MapReduce 整合

phoenix.apache.org

Mapreduce執行3種方式

本地方式執行:

pc環境

1.1、將 Hadoop安裝本地解壓
1.2、配置 Hadoop的環境變數
新增%HADOOP_HOME%
修改%PATH%新增%HADOOP_HOME%/bin;%HADOOP_HOME%/sbin
3、在解壓的 Hadoop的bin目錄下新增 winutils.exe工具
Java工程
2.1、jdk一定要使用自己的jdk、不要使用 eclipse自帶
2.2、根目錄(src目錄下),不要新增任何 Mapreduce的配置檔案hdfs-site.xml yarn-site.xml core-site.xml mapred-site.xml
2.3、在程式碼當中,通過conf.set方式來進行指定。conf set("fs.defaults","hdfs://nodel:8020");
2.4、修改 Hadoop原始碼
3、右鍵run執行

叢集執行兩種方式

Java工程

1、根目錄(Src目錄下),新增 Hadoop的配置檔案hdfs-site.xm1 yarn-Site.xm1 core-site.xm1 mapped-site.xm1
2、在程式碼當中,指定jar包的位置, config.set(" mapped.jar","D:\WR\wc.jar");
3、修改 Hadoop原始碼
4、將工程打jar包
5、右鍵run執行

Java工程

根目錄(src目錄下),新增 Hadoop的配置檔案hdfs-site.xm1 yarn-Site.xm1 core-site.xm1 mapped-site.xm1
2、將工程打jar包
手動將jar包上傳到叢集當中
4、通過 hadoop命令來執行。 hadoop jar jar位置mr程式碼入口(例如: hadoop jar/usr/wc.Jar com.sxt.mr.Wcjob)
在程式碼當中指定 Hbase所使用的 Zookeeper叢集

(注意:如果 hbase搭建的是仍分散式,那麼對應的 Zookeeper就是那臺偽分散式的伺服器
conf.set("hbase.zookeeper.quorum","nodel, node 2, node3")
System.setproperty("HADOOP_USER_NAME,"root");

HBase與MR整合文件

官方文件

49. MapReduce掃描快取

TableMapReduceUtil現在恢復在傳入的Scan物件上設定掃描程式快取(將結果返回給客戶端之前快取的行數)的選項。由於HBase 0.95(HBASE-11558)中的錯誤,此功能丟失了,對於HBase 0.98.5和0.96.3是固定的。選擇掃描器快取的優先順序如下:

  1. 在掃描物件上設定的快取設定。
  2. 通過配置選項指定的快取設定hbase.client.scanner.caching,可以在hbase-site.xml中手動設定,也可以通過helper方法設定TableMapReduceUtil.setScannerCaching()
  3. 預設值HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING,設定為100

優化快取設定是客戶端等待結果的時間與客戶端需要接收的結果集數量之間的平衡。如果快取設定太大,則客戶端可能會等待很長時間,甚至請求可能會超時。如果設定太小,則掃描需要分多次返回結果。如果您將掃描視為鏟子,則較大的快取設定類似於較大的鏟子,較小的快取設定等效於進行更多剷鬥以填充剷鬥。

上面提到的優先順序列表允許您設定一個合理的預設值,併為特定操作覆蓋它。

有關更多詳細資訊,請參見Scan的API文件。

50.捆綁的HBase MapReduce作業

HBase JAR還可以用作某些捆綁的MapReduce作業的驅動程式。要了解捆綁的MapReduce作業,請執行以下命令。

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
An example program must be given as the first argument.
Valid program names are:
  copytable: Export a table from local cluster to peer cluster
  completebulkload: Complete a bulk data load.
  export: Write table data to HDFS.
  import: Import data written by Export.
  importtsv: Import data in TSV format.
  rowcounter: Count rows in HBase table

每個有效的程式名稱都是捆綁的MapReduce作業。要執行作業之一,請在以下示例之後對命令建模。

$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable

51. HBase作為MapReduce作業資料來源和資料接收器

HBase可用作MapReduce作業的資料來源TableInputFormat和資料接收器TableOutputFormatMultiTableOutputFormat。編寫MapReduce作業以讀取或寫入HBase時,建議將TableMapper 和/或TableReducer子類。有關基本用法,請參見無作用傳遞類IdentityTableMapperIdentityTableReducer。有關更多示例,請參閱RowCounter或檢視org.apache.hadoop.hbase.mapreduce.TestTableMapReduce單元測試。

如果執行使用HBase作為源或接收器的MapReduce作業,則需要在配置中指定源和接收器表及列的名稱。

當您從HBase讀取資料時,會從HBase TableInputFormat請求區域列表,並建立一個地圖,該地圖可以是map-per-regionmapreduce.job.maps,以較小者為準。如果您的工作只有兩張地圖,請加薪mapreduce.job.maps數量大於區域數量。如果您在每個節點上執行TaskTracer / NodeManager和RegionServer,則地圖將在相鄰的TaskTracker / NodeManager上執行。寫入HBase時,應避免執行Reduce步驟,然後從地圖中寫回HBase。當您的工作不需要MapReduce對地圖發出的資料執行的排序和排序規則時,此方法有效。插入時,HBase會進行“排序”,因此除非有必要,否則就不會進行點雙重排序(並在MapReduce叢集周圍進行資料改組)。如果不需要精簡,則地圖可能會在作業結束時發出為報告而處理的記錄計數,或者將精簡數量設定為零並使用TableOutputFormat。如果根據您的情況執行“減少”步驟,

一個新的HBase分割槽程式HRegionPartitioner可以執行與現有區域數一樣多的reducer。HRegionPartitioner適用於表較大且上傳完成後不會大大改變現有區域數的情況。否則,請使用預設分割槽程式。

52.在批量匯入期間直接寫入HFile

如果要匯入到新表中,則可以繞過HBase API並將內容直接寫入檔案系統,並格式化為HBase資料檔案(HFiles)。您的匯入將執行得更快,也許要快一個數量級。有關此機制如何工作的更多資訊,請參見批量載入

53. RowCounter示例

包含的RowCounter MapReduce作業使用TableInputFormat並計算指定表中的所有行。要執行它,請使用以下命令:

$ ./bin/hadoop jar hbase-X.X.X.jar

這將呼叫HBase MapReduce驅動程式類。選擇rowcounter從提供的職位選擇。這會將行計數器使用建議列印到標準輸出。指定表名,要計數的列和輸出目錄。如果您遇到類路徑錯誤,請參見HBase,MapReduce和CLASSPATH

54.Map-Task Splitting

54.1。預設的HBase MapReduce拆分器

當使用TableInputFormat來在MapReduce作業中獲取HBase表時,其拆分器將為該表的每個區域建立一個對映任務。因此,如果表中有100個區域,則該作業將有100個對映任務-無論在“掃描”中選擇了多少列族。

54.2。定製分離器

對於那些有興趣在實現自定義的分離器,看到法getSplitsTableInputFormatBase。這就是對映任務分配的邏輯所在。

HBase MapReduce Examples

HBase MapReduce閱讀示例

以下是以只讀方式將HBase用作MapReduce源的示例。具體來說,有一個Mapper例項,但沒有Reducer,並且沒有從Mapper發出任何東西。該工作的定義如下...

Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class);     // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 掃描中的預設值為1,這對MapReduce作業不利
scan.setCacheBlocks(false);  // MR工作不要設定為true
// 設定其他scan屬性
...

TableMapReduceUtil.initTableMapperJob(
  tableName,        // 輸入HBase表名稱
  scan,             // scan 例項以控制CF和屬性
  MyMapper.class,   // mapper
  null,             // mapper output key
  null,             // mapper output value
  job);
job.setOutputFormatClass(NullOutputFormat.class);   //因為我們沒有從mapper發出任何東西

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

...而mapper例項將擴充套件TableMapper ...

public static class MyMapper extends TableMapper<Text, Text> {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
    //處理來自Result例項的行的資料。
   }
}

HBase MapReduce讀/寫示例

以下是通過MapReduce將HBase用作源和接收器的示例。此示例將簡單地將資料從一個表複製到另一個表。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) {
    throw 

需要說明TableMapReduceUtil正在做什麼,尤其是對於減速器。TableOutputFormat被用作outputFormat類,並且在配置(例如TableOutputFormat.OUTPUT_TABLE)上設定了幾個引數,並將reducer輸出鍵設定為ImmutableBytesWritable,reducer值設定為Writable。這些可以由程式設計師在工作和配置上設定,但是TableMapReduceUtil試圖使事情變得更容易。

以下是示例對映器,它將建立一個Put與輸入匹配的輸出Result。注意:這就是CopyTable實用程式的作用。

public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    // 這個例子只是從源表中複製資料..
      context.write(row, resultToPut(row,value));
    }

    private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
      Put put = new Put(key.get());
      for (Cell cell : result.listCells()) {
        put.add(cell);
      }
      return put;
    }
}

實際上沒有reducer步驟,因此請TableOutputFormat注意將其傳送Put到目標表。

這只是一個示例,開發人員可以選擇不使用TableOutputFormat並自己連線到目標表。

具有多表輸出的HBase MapReduce讀/寫示例

TODO:的示例MultiTableOutputFormat

HBase MapReduce彙總到HBase示例

以下示例將HBase用作MapReduce源和接收器,並進行彙總。本示例將對一個表中某個值的不同例項的數量進行計數,並將這些彙總計數寫入另一個表中。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

在此示例對映器中,選擇具有字串值的列作為要彙總的值。該值用作從對映器發出的鍵,並且一個IntWritable代表例項計數器。

public static class MyMapper extends TableMapper<Text, IntWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] ATTR1 = "attr1".getBytes();

  private final IntWritable ONE = new IntWritable(1);
  private Text text = new Text();

  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    String val = new String(value.getValue(CF, ATTR1));
    text.set(val);     // we can only emit Writables...
    context.write(text, ONE);
  }
}

In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a .

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
  public static final byte[] CF = "cf".getBytes();
  public static final byte[] COUNT = "count".getBytes();

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.add(CF, COUNT, Bytes.toBytes(i));

    context.write(null, put);
  }
}

HBase MapReduce摘要檔案示例

這與上面的摘要示例非常相似,不同的是,它使用HBase作為MapReduce源,但使用HDFS作為接收器。區別在於作業設定和減速機。對映器保持不變。

onfiguration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
job.setReducerClass(MyReducer.class);    // reducer class
job.setNumReduceTasks(1);    // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

如上所述,在此示例中,先前的Mapper可以保持不變。至於Reducer,它是一個“通用” Reducer,而不是擴充套件TableMapper和發出Puts。

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int i = 0;
    for (IntWritable val : values) {
      i += val.get();
    }
    context.write(key, new IntWritable(i));
  }
}

單詞統計案例(Maven)

pom檔案

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <!--定義hadoop版本-->
    <hadoop.version>2.7.5</hadoop.version>
  </properties>

  <dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/0.98.23-hadoop2 -->
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>0.98.23-hadoop2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.1.0</version>
    </dependency>
    <!--hadoop客服端依賴-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <!--hdfs檔案系統依賴-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <!--MapReduce相關的依賴-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

WCRunner

package icu.shaoyayu.hadoop.hbase.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * @author shaoyayu
 * @date 2020/7/15 11:33
 * @E_Mail
 * @Version 1.0.0
 * @readme :
 * Hbase與MapReduce結合使用
 */
public class WCRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //配置環境
        Configuration conf = new CompoundConfiguration();
        conf.set("","hadoopNode02,hadoopNode03,hadoopNode04");
        //需要設定儲存的NameNode節點
        conf.set("fs.defaultFS", "hdfs://hadoopNode01:8020");

        Job job = Job.getInstance(conf);
        job.setJarByClass(WCRunner.class);

        job.setMapperClass(WCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //最後引數一定寫false
        TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false);
        FileInputFormat.addInputPath(job, new Path("/usr/wc"));
        // reduce端輸出的key和value的型別
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Put.class);

        // job.setOutputFormatClass(cls);
        // job.setInputFormatClass(cls);

        job.waitForCompletion(true);

    }

}

WCMapper

package icu.shaoyayu.hadoop.hbase.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author shaoyayu
 * @date 2020/7/15 11:43
 * @E_Mail
 * @Version 1.0.0
 * @readme :
 */
public class WCMapper extends Mapper<LongWritable,Text, Text, IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] splits = value.toString().split(" ");
        //第二種切割方法
//    new StringTokenizer(value.toString()," ");
        for (String string : splits) {
            context.write(new Text(string), new IntWritable(1));
        }
    }
}

WCReducer

package icu.shaoyayu.hadoop.hbase.mr;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * @author shaoyayu
 * @date 2020/7/15 11:45
 * @E_Mail
 * @Version 1.0.0
 * @readme :
 */
public class WCReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> iter,
                          Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable intWritable : iter) {
            sum+=intWritable.get();
        }
        Put put = new Put(key.toString().getBytes());
        put.add("cf".getBytes(), "cf".getBytes(), String.valueOf(sum).getBytes());
        context.write(null, put);
    }
}

原始碼分析:TableMapReduceUtil

initTableReducerJob()

TableMapReduceUtil.initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job,
Class partitioner, String quorumAddress, String serverClass,
String serverImpl, boolean addDependencyJars)

/ ** 
    *在提交TableReduce作業之前使用此功能。 
    *將適當地設定JobConf。 
    * 
    * @param table輸出表。 
    * @param reducer要使用的reducer類。 
    * @param job當前要調整的作業。確保傳遞的作業*具有所有必要的HBase配置。 
    * @param partitioner要使用的Partitioner。傳遞<code> null </ code>以使用*預設分割槽程式。 
    * @param quorumAddress要寫入的遠端叢集; 
    *輸出到<code> hbase-site.xml </ code>中指定的叢集的預設值為null。 
    *將此字串設定為備用遠端叢集的zookeeper整合
    *當您需要減少寫入非預設叢集的叢集時;例如在叢集之間複製表時,源將由<code> hbase-site.xml </code>指定,
    *並且該引數將具有遠端叢集的集合地址。要傳遞的格式特別。 
    *傳遞<code> hbase.zookeeper.quorum
    * hbase.zookeeper.client.port
    * zookeeper.znode.parent 
    * </ code>,例如<code> server,server2,server3:2181:/ hbase </ code>。 
    * @param serverClass重新定義了hbase.regionserver.class 
    * @param serverImpl重新定義了hbase.regionserver.impl 
    * @param addDependencyJars通過分散式快取(tmpjars)為任何已配置的作業類上載HBase jar和jars。 
    * @throws IOException當確定區域計數失敗時。 
    * /
public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job,
    Class partitioner, String quorumAddress, String serverClass,
    String serverImpl, boolean addDependencyJars) throws IOException {

    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
    //重點在這個地方設定輸出到HBase裡面去
    job.setOutputFormatClass(TableOutputFormat.class);
    if (reducer != null) job.setReducerClass(reducer);
    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
    conf.setStrings("io.serializations", conf.get("io.serializations"),
        MutationSerialization.class.getName(), ResultSerialization.class.getName());
    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
    if (quorumAddress != null) {
      // Calling this will validate the format
      ZKConfig.validateClusterKey(quorumAddress);
      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
    }
    if (serverClass != null && serverImpl != null) {
      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
    }
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Writable.class);
    if (partitioner == HRegionPartitioner.class) {
      job.setPartitionerClass(HRegionPartitioner.class);
      int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
      if (job.getNumReduceTasks() > regions) {
        job.setNumReduceTasks(regions);
      }
    } else if (partitioner != null) {
      job.setPartitionerClass(partitioner);
    }

    if (addDependencyJars) {
      addDependencyJars(job);
    }

    initCredentials(job);
  }

TableOutputFormat

/ **
    *建立一個新的記錄作者。
    *
    *請注意,基線javadoc給人的印象是隻有一個
    *每個工作{@link RecordWriter},但在HBase中,如果我們給您一個新的
    * RecordWriter每次呼叫此方法。 完成後,您必須關閉返回的RecordWriter。
    *否則將丟失寫入。
    *
    * @param context當前任務上下文。
    * @return新建立的writer例項。
    * @throws IOException建立寫入器時失敗。
    * @throws InterruptedException作業取消時。
    * /
  @Override
  public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
  throws IOException, InterruptedException {
    return new TableRecordWriter();
  }

TableRecordWriter

/**
   * 將reducer輸出寫入HBase表。
   */
  protected class TableRecordWriter
  extends RecordWriter<KEY, Mutation> {

    private Connection connection;
    private BufferedMutator mutator;

    /**
     * @throws IOException
     *
     */
    public TableRecordWriter() throws IOException {
      String tableName = conf.get(OUTPUT_TABLE);
      this.connection = ConnectionFactory.createConnection(conf);
      this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
      LOG.info("Created table instance for "  + tableName);
    }
    /**
     * Closes the writer, in this case flush table commits.
     *
     * @param context  The context.
     * @throws IOException When closing the writer fails.
     * @see RecordWriter#close(TaskAttemptContext)
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException {
      try {
        if (mutator != null) {
          mutator.close();
        }
      } finally {
        if (connection != null) {
          connection.close();
        }
      }
    }

/ **
      *將一個鍵/值對寫入表中。
      *
      * @param鍵鍵。
      * @param value值。
      * @throws IOException寫入失敗時。
      * @請參見RecordWriter#write(Object,Object)
      * /
    @Override
    public void write(KEY key, Mutation value)
    throws IOException {
      if (!(value instanceof Put) && !(value instanceof Delete)) {
        throw new IOException("Pass a Delete or a Put");
      }
      mutator.mutate(value);
    }
  }

initTableMapperJob()

/ **
  *在提交Multi TableMap作業之前使用它。 它將適當設定
  *完成工作。
  *
  * @param scans要讀取的{@link Scan}物件的列表。
  * @param mapper要使用的mapper類。
  * @param outputKeyClass輸出鍵的類。
  * @param outputValueClass輸出值的類。
  * @param job當前要調整的作業。 確保通過的工作正在進行
  *所有必需的HBase配置。
  * @param addDependencyJars上傳HBase jar和任何
  *通過分散式快取(tmpjars)配置作業類。
  * @param initCredentials是否初始化作業的hbase身份驗證憑據
  * @throws IOException設定細節時失敗。
  * /
public static void initTableMapperJob(List<Scan> scans,
    Class<? extends TableMapper> mapper,
    Class<?> outputKeyClass,
    Class<?> outputValueClass, Job job,
    boolean addDependencyJars,
    boolean initCredentials) throws IOException {
    //設定Hbase為輸入物件
  job.setInputFormatClass(MultiTableInputFormat.class);
  if (outputValueClass != null) {
    job.setMapOutputValueClass(outputValueClass);
  }
  if (outputKeyClass != null) {
    job.setMapOutputKeyClass(outputKeyClass);
  }
  job.setMapperClass(mapper);
  Configuration conf = job.getConfiguration();
  HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
  List<String> scanStrings = new ArrayList<>();

  for (Scan scan : scans) {
    scanStrings.add(convertScanToString(scan));
  }
  job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
    scanStrings.toArray(new String[scanStrings.size()]));

  if (addDependencyJars) {
    addDependencyJars(job);
  }

  if (initCredentials) {
    initCredentials(job);
  }
}

MultiTableInputFormat

在父類MultiTableInputFormatBase

/ **
  *計算將用作地圖任務輸入的分割。 的
  *分割數與表中的區域數匹配。
  *
  * @param context當前作業上下文。
  * @return輸入拆分列表。
  * @throws IOException建立拆分列表時失敗。
  * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
  * /
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
  if (scans.isEmpty()) {
    throw new IOException("No scans were provided.");
  }
  Map<TableName, List<Scan>> tableMaps = new HashMap<>();
  for (Scan scan : scans) {
    byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
    if (tableNameBytes == null)
      throw new IOException("A scan object did not have a table name");
    TableName tableName = TableName.valueOf(tableNameBytes);
    List<Scan> scanList = tableMaps.get(tableName);
    if (scanList == null) {
      scanList = new ArrayList<>();
      tableMaps.put(tableName, scanList);
    }
    scanList.add(scan);
  }
  List<InputSplit> splits = new ArrayList<>();
  Iterator iter = tableMaps.entrySet().iterator();
  // Make a single Connection to the Cluster and use it across all tables.
  try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
    while (iter.hasNext()) {
      Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
      TableName tableName = entry.getKey();
      List<Scan> scanList = entry.getValue();
      try (Table table = conn.getTable(tableName);
           RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
            regionLocator, conn.getAdmin());
        Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
        for (Scan scan : scanList) {
          if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
            throw new IOException("Expecting at least one region for table : "
                + tableName.getNameAsString());
          }
          int count = 0;
          byte[] startRow = scan.getStartRow();
          byte[] stopRow = scan.getStopRow();
          for (int i = 0; i < keys.getFirst().length; i++) {
            if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
              continue;
            }
            if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
                Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
                (stopRow.length == 0 || Bytes.compareTo(stopRow,
                    keys.getFirst()[i]) > 0)) {
              byte[] splitStart = startRow.length == 0 ||
                  Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
                  keys.getFirst()[i] : startRow;
              byte[] splitStop = (stopRow.length == 0 ||
                  Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
                  keys.getSecond()[i].length > 0 ?
                  keys.getSecond()[i] : stopRow;
              HRegionLocation hregionLocation = regionLocator.getRegionLocation(
                  keys.getFirst()[i], false);
              String regionHostname = hregionLocation.getHostname();
              HRegionInfo regionInfo = hregionLocation.getRegionInfo();
              String encodedRegionName = regionInfo.getEncodedName();
              long regionSize = sizeCalculator.getRegionSize(
                  regionInfo.getRegionName());
              TableSplit split = new TableSplit(table.getName(),
                  scan, splitStart, splitStop, regionHostname,
                  encodedRegionName, regionSize);
              splits.add(split);
              if (LOG.isDebugEnabled()) {
                LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
              }
            }
          }
        }
      }
    }
  }
  return splits;
}

預設的切片是RowKey的大小