1. 程式人生 > >有kerberos認證hbase在spark環境下的使用

有kerberos認證hbase在spark環境下的使用

hadoop中計算框架MapReduce中儲存到有kerberos的hdfs,由於其內部yarn進行了認證故不需要進行相關的操作,可直接進行讀寫操作。

spark使用有kerberos認證的hbase是一個既麻煩又簡單的問題,麻煩的方面是:中文的網站相關的文章很少並且分佈只是分散的知識點。官網中給的資訊也不夠完整,倘若要是使用還是會出現自己採坑的想象。簡單的方面是:程式碼量很少,理解起來也不是很難。本文kerberos認證的方式使用的是keytab方式

一、有kerberos認證的hbase的寫過程

由於配置檔案在maven編譯後,打成jar包被driver分發到各個excutor的快取區中,各配置檔案properties,由於底層使用反射技術可以進入jar中讀取配置供程式使用。但是keytab作為一個單獨的檔案,由於沒有使用properties,程式只能從資料夾中進行讀取,在檔案中使用傳統的kerberos認證就會出現找不到檔案的情況。

有兩種方式可以解決這種問題:1.在sparkcontext處,使用sc.addfile(""),可以將檔案加入到excutor的記憶體中去,同時在hbase進行IO操作的程式碼處使用SparkFiles.get()即可獲取記憶體中檔案。2.spark-submit提交命令中使用 --files xx.keytab 將檔案加入到excutor記憶體中,使用獲取jar包路徑的反射方式進而拼接出keytab的路徑,即可獲取keytab檔案。

最後將經過認證的user,加入到connection中,然後使用這個connection即可對hbase進行寫操作了。

二、有kerberos認證的hbase的讀過程

使用寫過程獲取經過認證的user,在

sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]

中的TableInputFormat中建立用於讀取hbase的connection。即繼承重寫TableInputFormat的程式碼中的connection,然後將自己定義的MyTableInputFormat裝入此運算元即可。

三、具體認證過程涉及的程式碼如下:

//此類是hbase的IO操作類,其中呼叫了kerberosUtile的相關認證方法。

import java.util.Properties

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.HConnectionManager
import org.apache.hadoop.hbase.client.HTableInterface
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkFiles
import org.apache.spark.rdd.RDD

import cn.ctyun.UIDSS.hgraph.HGraphUtil
import cn.ctyun.UIDSS.utils.Hash
import cn.ctyun.UIDSS.utils.KerberorsJavaUtil
import cn.ctyun.UIDSS.utils.Logging

object HBaseIO extends Logging {

  def getGraphTableRDD(sc: SparkContext, props: Properties): RDD[(ImmutableBytesWritable, Result)] = {
    val hconf = HBaseConfiguration.create()

    //set zookeeper quorum
    hconf.set("hbase.zookeeper.quorum", props.getProperty("hbaseZkIp"));
    //set zookeeper port
    hconf.set("hbase.zookeeper.property.clientPort", props.getProperty("hbaseZkPort"));    
    hconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    hconf.set("hbase.zookeeper.property.maxClientCnxns", props.getProperty("hbase_zookeeper_property_maxClientCnxns"));
    hconf.set("hbase.client.retries.number", props.getProperty("hbase_client_retries_number"));    
    hconf.addResource("core-site.xml")
    hconf.addResource("hbase-site.xml")
    hconf.addResource("hdfs-site.xml")

    //set which table to scan
    //===override the TableInputFormat to MyInputFormat added kerberos authentication===
    hconf.set(MyTableInputFormat.INPUT_TABLE, props.getProperty("hbaseTableName"))

    //println(getNowDate() + " ****** Start reading from HBase   ******")
    val rdd = sc.newAPIHadoopRDD(hconf, classOf[MyTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]).cache()
    //println(getNowDate() + " ****** Finished reading from HBase   ******")

    //遍歷輸出
    //    rdd.foreach {
    //      case (_, result) =>
    //        val key = Bytes.toString(result.getRow.drop(2))
    //        //println("Row key:" + key)
    //        for (c <- result.rawCells()) {
    //          val dst = Bytes.toString(c.getQualifier)
    //          var value = 0
    //          try {
    //            value = Bytes.toInt(c.getValue)
    //          } catch {
    //            case _: Throwable =>
    //          }
    //          //println("        column is: " + dst + " ;  value is: " + value)
    //        }
    //    }
    rdd
  }

  def saveToGraphTable(sc: SparkContext, props: Properties, rddToSave: RDD[((String, String), String)]): Int = {
    info("------Writing data to Graph table start--------")
    var rddToSavePartition = rddToSave
    
    val partNumHBaseO = props.getProperty("rddPartNumHBaseO").toInt
//    if (partNumHBaseO > 0) {
//      rddToSavePartition = rddToSave.repartition(partNumHBaseO)
//      val cnt= rddToSavePartition.count().toString() 
//      info(" ******  Writing " + cnt + " rows to HBase ******")
//      println(" ******  Writing " + cnt + " rows to HBase ******")
//    } 
    
    //多分割槽並行輸出
    info("------foreachPartition write data start--------")
    rddToSavePartition.foreachPartition {
      //一個分割槽內的所有行     
      case (rows) =>
        //println("        column is: " + this.getClass.getClassLoader().getResource(""))
        val hconf = HBaseConfiguration.create()
        info("---------each partition create HBaseConfiguration-----------")
        //set zookeeper quorum
        hconf.set("hbase.zookeeper.quorum", props.getProperty("hbaseZkIp"))
        //set zookeeper port
        hconf.set("hbase.zookeeper.property.clientPort", props.getProperty("hbaseZkPort"))           
        hconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        hconf.set("hbase.zookeeper.property.maxClientCnxns", props.getProperty("hbase_zookeeper_property_maxClientCnxns"))
        hconf.set("hbase.client.retries.number", props.getProperty("hbase_client_retries_number"))
        hconf.set("hbase.client.pause", "1000")
        hconf.set("zookeeper.recovery.retry", "3")
        
        hconf.addResource("core-site.xml")
        hconf.addResource("hbase-site.xml")
        hconf.addResource("hdfs-site.xml")   
        //=========get HBase authenticated user==========
        val loginedUser = KerberorsJavaUtil.getAuthenticatedUser(hconf,props,props.getProperty("keytabFile"))
        val connection = HConnectionManager.createConnection(hconf,loginedUser)
        info("------HBase connection is created--------")
        val htable: HTableInterface = connection.getTable(TableName.valueOf(props.getProperty("hbaseTableName")))

        //批量寫入
        val flushInBatch = props.getProperty("flushInBatch")
        val sWaitForHBase = props.getProperty("waitForHBase")
        val batchSize = props.getProperty("batchSize")
        
        var waitForHBase = 0
        if (flushInBatch != null && "1".compareToIgnoreCase(flushInBatch) == 0) {
          htable.setAutoFlushTo(false);
          htable.setWriteBufferSize(1024 * 1024 * batchSize.toInt);
          if (sWaitForHBase != null && sWaitForHBase.toInt > 0) {
            waitForHBase = sWaitForHBase.toInt
          }
        }

        //println(getNowDate() + " ****** Start writing to HBase   ******")

        var rowCount = 0 
        
//        for (row <- rows.toArray) (
        info("------HBase write data start--------")
        for (row <- rows) (
          {
            //row  ((行,列),值)) 
            var src: String =  Hash.getHashString(row._1._1) + row._1._1 
            var dst: String = row._1._2
            var prop: Int = row._2.toInt
            //println("Row is: " + src + " ;column is: " + dst + " ; value is: " + prop)

            val put = new Put(Bytes.toBytes(src))
            put.add(HGraphUtil.COLUMN_FAMILY, Bytes.toBytes(dst), Bytes.toBytes(prop))
            put.setWriteToWAL(false)
            htable.put(put)
            
            rowCount = rowCount +1

            //降低寫入速度
            if ((rowCount % 1000)==0 && waitForHBase >0) { Thread.sleep(waitForHBase)}
          })
        //println(getNowDate() + " ****** Finished writing to HBase   ******")  
        try{
          info("=======prepare to flushCommits======")
          htable.flushCommits()
          info("=======flushCommits finished======")
        }catch {
          case e: Exception =>
          info("=======flushCommits failed=======")
        }
        htable.close();          
        //println(getNowDate() + " ****** Flushed  to HBase   ******")
        info("------HBase write data finished--------")
    }
    1
  }
}
//此類是kerberos認證的工具類,其中對user進行相關的kerberos認證

import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

public class KerberorsJavaUtil {
	private static final Logger LOG = Logger.getLogger(KerberorsJavaUtil.class);
	
	public static void getHBaseAuthentication(Configuration hconf,Properties props,String keytabFile){
		//get the keytab file path which added from spark submit "--files"
		String keyFilePath = KerberorsJavaUtil.class.getResource("/").getPath();
		LOG.info("=====file path====="+keyFilePath);
	    if(keyFilePath.startsWith("file")){
	    	keyFilePath = keyFilePath.substring(5);
	    }
	    //method "loginUserFromKeytab" required keyFilePath like "AAA/XXX/./keyFile"
	    keyFilePath = keyFilePath+"./"+keytabFile;
		LOG.info("------Start Get HBaseAuthentication-----");
		System.setProperty("java.security.krb5.conf",props.getProperty("krb5ConfDir"));
		hconf.set("hbase.security.authentication","kerberos");  
		hconf.set("hadoop.security.authentication","Kerberos");
		//hdfs-site.xml中namenode principal配置資訊
		hconf.set("hbase.master.kerberos.principal",props.getProperty("masterPrin")); 
		//hdfs-site.xml中datanode principal配置資訊
		hconf.set("hbase.regionserver.kerberos.principal",props.getProperty("regionPrin"));  
		UserGroupInformation.setConfiguration(hconf);  
	    try {
	    	//kerberos 認證 ,指定認證使用者及keytab檔案路徑。
	    	LOG.info("------dev_yx.keytab path is---"+keyFilePath);
	    	UserGroupInformation.loginUserFromKeytab(props.getProperty("userName"),keyFilePath);
	    	LOG.info("------Get HBaseAuthentication Successed-----");
	    } catch (Exception e) {  
	        LOG.error("Get HBaseAuthentication Failed",e);  
	    }
	   
	}
	
	public static User getAuthenticatedUser(Configuration hconf,Properties props,String keytabFile){
		getHBaseAuthentication(hconf,props,keytabFile);		
		User loginedUser = null;
	    try {
	    	LOG.info("=====put the logined userinfomation to user====");
			loginedUser = User.create(UserGroupInformation.getLoginUser());
		} catch (IOException e) {
			LOG.error("===fialed put the logined userinfomation to user===",e);
		}	    
	    return loginedUser;
	}
	
}
//此類繼承了TableInputFormatBase,使用kerberosUtile重寫了其中的connection方法,獲得經過認證的user,即可對hbase進行讀操作


import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.StringUtils;

import cn.ctyun.UIDSS.utils.KerberorsJavaUtil;

/**
 * Convert HBase tabular data into a format that is consumable by Map/Reduce.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyTableInputFormat extends TableInputFormatBase
implements Configurable {

  @SuppressWarnings("hiding")
  private static final Log LOG = LogFactory.getLog(MyTableInputFormat.class);

  /** Job parameter that specifies the input table. */
  public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
  /**
   * If specified, use start keys of this table to split.
   * This is useful when you are preparing data for bulkload.
   */
  private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
  /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
   * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
   */
  public static final String SCAN = "hbase.mapreduce.scan";
  /** Scan start row */
  public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
  /** Scan stop row */
  public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
  /** Column Family to Scan */
  public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
  /** Space delimited list of columns and column families to scan. */
  public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
  /** The timestamp used to filter columns with a specific timestamp. */
  public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
  /** The starting timestamp used to filter columns with a specific range of versions. */
  public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
  /** The ending timestamp used to filter columns with a specific range of versions. */
  public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
  /** The maximum number of version to return. */
  public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
  /** Set to false to disable server-side caching of blocks for this scan. */
  public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
  /** The number of rows for caching that will be passed to scanners. */
  public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
  /** Set the maximum number of values to return for each call to next(). */
  public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
  /** Specify if we have to shuffle the map tasks. */
  public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";

  /** The configuration. */
  private Configuration conf = null;
  
  /** The kerberos authenticated user*/
  private User user;  
  
  /**
   * Returns the current configuration.
   *
   * @return The current configuration.
   * @see Configurable#getConf()
   */
  @Override
  public Configuration getConf() {
    return conf;
  }
  
  /**
   * Sets the configuration. This is used to set the details for the table to
   * be scanned.
   *
   * @param configuration  The configuration to set.
   * @see Configurable#setConf(
   *   Configuration)
   */
  @Override
  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
    justification="Intentional")
  public void setConf(Configuration configuration) {
    this.conf = configuration;
    //=========get kerberos authentication before create hbase connection========== 
    Properties props = new Properties();
    try {
		props.load(this.getClass().getClassLoader().getResourceAsStream("user-id-server.properties"));
	} catch (IOException e1) {
		LOG.error("load properties file failed",e1);
	}
    user = KerberorsJavaUtil.getAuthenticatedUser(conf, props, props.getProperty("keytabFile"));
    //=============================================================================   
    
    Scan scan = null;

    if (conf.get(SCAN) != null) {
      try {
        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
      } catch (IOException e) {
        LOG.error("An error occurred.", e);
      }
    } else {
      try {
        scan = new Scan();

        if (conf.get(SCAN_ROW_START) != null) {
          scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
        }

        if (conf.get(SCAN_ROW_STOP) != null) {
          scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
        }

        if (conf.get(SCAN_COLUMNS) != null) {
          addColumns(scan, conf.get(SCAN_COLUMNS));
        }

        if (conf.get(SCAN_COLUMN_FAMILY) != null) {
          scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
        }

        if (conf.get(SCAN_TIMESTAMP) != null) {
          scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
        }

        if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
          scan.setTimeRange(
              Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
              Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
        }

        if (conf.get(SCAN_MAXVERSIONS) != null) {
          scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
        }

        if (conf.get(SCAN_CACHEDROWS) != null) {
          scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
        }

        if (conf.get(SCAN_BATCHSIZE) != null) {
          scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
        }

        // false by default, full table scans generate too much BC churn
        scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
      } catch (Exception e) {
          LOG.error(StringUtils.stringifyException(e));
      }
    }

    setScan(scan);
  }

  @Override
  protected void initialize(JobContext context) throws IOException {
    // Do we have to worry about mis-matches between the Configuration from setConf and the one
    // in this context?
    TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
    try {
     //====================add authenticated user ===================
      initializeTable(ConnectionFactory.createConnection(new Configuration(conf),user), tableName);
    } catch (Exception e) {
      LOG.error(StringUtils.stringifyException(e));
    }
  }

  /**
   * Parses a combined family and qualifier and adds either both or just the
   * family in case there is no qualifier. This assumes the older colon
   * divided notation, e.g. "family:qualifier".
   *
   * @param scan The Scan to update.
   * @param familyAndQualifier family and qualifier
   * @throws IllegalArgumentException When familyAndQualifier is invalid.
   */
  private static void addColumn(Scan scan, byte[] familyAndQualifier) {
    byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
    if (fq.length == 1) {
      scan.addFamily(fq[0]);
    } else if (fq.length == 2) {
      scan.addColumn(fq[0], fq[1]);
    } else {
      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
    }
  }

  /**
   * Adds an array of columns specified using old format, family:qualifier.
   * <p>
   * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
   * input.
   *
   * @param scan The Scan to update.
   * @param columns array of columns, formatted as <code>family:qualifier</code>
   * @see Scan#addColumn(byte[], byte[])
   */
  public static void addColumns(Scan scan, byte [][] columns) {
    for (byte[] column : columns) {
      addColumn(scan, column);
    }
  }

  /**
   * Calculates the splits that will serve as input for the map tasks. The
   * number of splits matches the number of regions in a table. Splits are shuffled if
   * required.
   * @param context  The current job context.
   * @return The list of input splits.
   * @throws IOException When creating the list of splits fails.
   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
   *   JobContext)
   */
  @Override
  public List<InputSplit> getSplits(JobContext context) throws IOException {
    List<InputSplit> splits = super.getSplits(context);
    if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
      Collections.shuffle(splits);
    }
    return splits;
  }

  /**
   * Convenience method to parse a string representation of an array of column specifiers.
   *
   * @param scan The Scan to update.
   * @param columns  The columns to parse.
   */
  private static void addColumns(Scan scan, String columns) {
    String[] cols = columns.split(" ");
    for (String col : cols) {
      addColumn(scan, Bytes.toBytes(col));
    }
  }

  @Override
  protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
    if (conf.get(SPLIT_TABLE) != null) {
      TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
      //====================add authenticated user ===================
      try (Connection conn = ConnectionFactory.createConnection(getConf(),user)) {
        try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
          return rl.getStartEndKeys();
        }
      }
    }

    return super.getStartEndKeys();
  }

  /**
   * Sets split table in map-reduce job.
   */
  public static void configureSplitTable(Job job, TableName tableName) {
    job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
  }
}
/**
 *此類是完全copy的原始碼,存在的合理性是因為TableInputFormatBase中存在一個包範圍許可權的方法,此類是支援此方法的

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.ctyun.UIDSS.hbase;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HRegionPartitioner;
import org.apache.hadoop.hbase.mapreduce.JarFinder;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.PutCombiner;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;

/**
 * Utility for {@link TableMapper} and {@link TableReducer}
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TableMapReduceUtil {
  private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class);

  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table  The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
        job, true);
  }


  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table  The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(TableName table,
      Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass,
      Job job) throws IOException {
    initTableMapperJob(table.getNameAsString(),
        scan,
        mapper,
        outputKeyClass,
        outputValueClass,
        job,
        true);
  }

  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table Binary representation of the table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
   public static void initTableMapperJob(byte[] table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass,
              job, true);
  }

   /**
    * Use this before submitting a TableMap job. It will appropriately set up
    * the job.
    *
    * @param table  The table name to read from.
    * @param scan  The scan instance with the columns, time range etc.
    * @param mapper  The mapper class to use.
    * @param outputKeyClass  The class of the output key.
    * @param outputValueClass  The class of the output value.
    * @param job  The current job to adjust.  Make sure the passed job is
    * carrying all necessary HBase configuration.
    * @param addDependencyJars upload HBase jars and jars for any of the configured
    *           job classes via the distributed cache (tmpjars).
    * @throws IOException When setting up the details fails.
    */
   public static void initTableMapperJob(String table, Scan scan,
       Class<? extends TableMapper> mapper,
       Class<?> outputKeyClass,
       Class<?> outputValueClass, Job job,
       boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
   throws IOException {
     initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job,
         addDependencyJars, true, inputFormatClass);
   }


  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table  The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   * @param initCredentials whether to initialize hbase auth credentials for the job
   * @param inputFormatClass the input format
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars, boolean initCredentials,
      Class<? extends InputFormat> inputFormatClass)
  throws IOException {
    job.setInputFormatClass(inputFormatClass);
    if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
    if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
    job.setMapperClass(mapper);
    if (Put.class.equals(outputValueClass)) {
      job.setCombinerClass(PutCombiner.class);
    }
    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
    conf.set(TableInputFormat.INPUT_TABLE, table);
    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
    conf.setStrings("io.serializations", conf.get("io.serializations"),
        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
        KeyValueSerialization.class.getName());
    if (addDependencyJars) {
      addDependencyJars(job);
    }
    if (initCredentials) {
      initCredentials(job);
    }
  }

  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table Binary representation of the table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   * @param inputFormatClass The class of the input format
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(byte[] table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass)
  throws IOException {
      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
              outputValueClass, job, addDependencyJars, inputFormatClass);
  }

  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table Binary representation of the table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(byte[] table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars)
  throws IOException {
      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
              outputValueClass, job, addDependencyJars, TableInputFormat.class);
  }

  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars)
  throws IOException {
      initTableMapperJob(table, scan, mapper, outputKeyClass,
              outputValueClass, job, addDependencyJars, TableInputFormat.class);
  }

  /**
   * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on
   * direct memory will likely cause the map tasks to OOM when opening the region. This
   * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user
   * wants to override this behavior in their job.
   */
  public static void resetCacheConfig(Configuration conf) {
    conf.setFloat(
      HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
    conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f);
    conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY);
  }

  /**
   * Sets up the job for reading from one or more table snapshots, with one or more scans
   * per snapshot.
   * It bypasses hbase servers and read directly from snapshot files.
   *
   * @param snapshotScans     map of snapshot name to scans on that snapshot.
   * @param mapper            The mapper class to use.
   * @param outputKeyClass    The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job               The current job to adjust.  Make sure the passed job is
   *                          carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *                          job classes via the distributed cache (tmpjars).
   */
  public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans,
      Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass,
      Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException {
    MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir);

    job.setInputFormatClass(MultiTableSnapshotInputFormat.class);
    if (outputValueClass != null) {
      job.setMapOutputValueClass(outputValueClass);
    }
    if (outputKeyClass != null) {
      job.setMapOutputKeyClass(outputKeyClass);
    }
    job.setMapperClass(mapper);
    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));

    if (addDependencyJars) {
      addDependencyJars(job);
    }

    resetCacheConfig(job.getConfiguration());
  }

  /**
   * Sets up the job for reading from a table snapshot. It bypasses hbase servers
   * and read directly from snapshot files.
   *
   * @param snapshotName The name of the snapshot (of a table) to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   *
   * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
   * After the job is finished, restore directory can be deleted.
   * @throws IOException When setting up the details fails.
   * @see TableSnapshotInputFormat
   */
  public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars, Path tmpRestoreDir)
  throws IOException {
    TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
    initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
        outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
    resetCacheConfig(job.getConfiguration());
  }

  /**
   * Use this before submitting a Multi TableMap job. It will appropriately set
   * up the job.
   *
   * @param scans The list of {@link Scan} objects to read from.
   * @param mapper The mapper class to use.
   * @param outputKeyClass The class of the output key.
   * @param outputValueClass The class of the output value.
   * @param job The current job to adjust. Make sure the passed job is carrying
   *          all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(List<Scan> scans,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job) throws IOException {
    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
        true);
  }

  /**
   * Use this before submitting a Multi TableMap job. It will appropriately set
   * up the job.
   *
   * @param scans The list of {@link Scan} objects to read from.
   * @param mapper The mapper class to use.
   * @param outputKeyClass The class of the output key.
   * @param outputValueClass The class of the output value.
   * @param job The current job to adjust. Make sure the passed job is carrying
   *          all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the
   *          configured job classes via the distributed cache (tmpjars).
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(List<Scan> scans,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars) throws IOException {
    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
      addDependencyJars, true);
  }

  /**
   * Use this before submitting a Multi TableMap job. It will appropriately set
   * up the job.
   *
   * @param scans The list of {@link Scan} objects to read from.
   * @param mapper The mapper class to use.
   * @param outputKeyClass The class of the output key.
   * @param outputValueClass The class of the output value.
   * @param job The current job to adjust. Make sure the passed job is carrying
   *          all necessary HBase configuration.
   * @param addDependencyJars upload HBase jars and jars for any of the
   *          configured job classes via the distributed cache (tmpjars).
   * @param initCredentials whether to initialize hbase auth credentials for the job
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(List<Scan> scans,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job,
      boolean addDependencyJars,
      boolean initCredentials) throws IOException {
    job.setInputFormatClass(MultiTableInputFormat.class);
    if (outputValueClass != null) {
      job.setMapOutputValueClass(outputValueClass);
    }
    if (outputKeyClass != null) {
      job.setMapOutputKeyClass(outputKeyClass);
    }
    job.setMapperClass(mapper);
    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
    List<String> scanStrings = new ArrayList<String>();

    for (Scan scan : scans) {
      scanStrings.add(convertScanToString(scan));
    }
    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
      scanStrings.toArray(new String[scanStrings.size()]));

    if (addDependencyJars) {
      addDependencyJars(job);
    }

    if (initCredentials) {
      initCredentials(job);
    }
  }

  public static void initCredentials(Job job) throws IOException {
    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
    if (userProvider.isHadoopSecurityEnabled()) {
      // propagate delegation related props from launcher job to MR job
      if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
        job.getConfiguration().set("mapreduce.job.credentials.binary",
                                   System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
      }
    }

    if (userProvider.isHBaseSecurityEnabled()) {
      try {
        // init credentials for remote cluster
        String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
        User user = userProvider.getCurrent();
        if (quorumAddress != null) {
          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
          Connection peerConn = ConnectionFactory.createConnection(peerConf);
          try {
            TokenUtil.addTokenForJob(peerConn, user, job);
          } finally {
            peerConn.close();
          }
        }

        Connection conn = ConnectionFactory.createConnection(job.getConfiguration());
        try {
          TokenUtil.addTokenForJob(conn, user, job);
        } finally {
          conn.close();
        }
      } catch (InterruptedException ie) {
        LOG.info("Interrupted obtaining user authentication token");
        Thread.currentThread().interrupt();
      }
    }
  }

  /**
   * Obtain an authentication token, for the specified cluster, on behalf of the current user
   * and add it to the credentials for the given map reduce job.
   *
   * The quorumAddress is the key to the ZK ensemble, which contains:
   * hbase.zookeeper.quorum, hbase.zookeeper.client.port and
   * zookeeper.znode.parent
   *
   * @param job The job that requires the permission.
   * @param quorumAddress string that contains the 3 required configuratins
   * @throws IOException When the authentication token cannot be obtained.
   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
   */
  @Deprecated
  public static void initCredentialsForCluster(Job job, String quorumAddress)
      throws IOException {
    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
        quorumAddress);
    initCredentialsForCluster(job, peerConf);
  }

  /**
   * Obtain an authentication token, for the specified cluster, on behalf of the current user
   * and add it to the credentials for the given map reduce job.
   *
   * @param job The job that requires the permission.
   * @param conf The configuration to use in connecting to the peer cluster
   * @throws IOException When the authentication token cannot be obtained.
   */
  public static void initCredentialsForCluster(Job job, Configuration conf)
      throws IOException {
    UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
    if (userProvider.isHBaseSecurityEnabled()) {
      try {
        Connection peerConn = ConnectionFactory.createConnection(conf);
        try {
          TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
        } finally {
          peerConn.close();
        }
      } catch (InterruptedException e) {
        LOG.info("Interrupted obtaining user authentication token");
        Thread.interrupted();
      }
    }
  }

  /**
   * Writes the given scan into a Base64 encoded string.
   *
   * @param scan  The scan to write out.
   * @return The scan saved in a Base64 encoded string.
   * @throws IOException When writing the scan fails.
   */
  static String convertScanToString(Scan scan) throws IOException {
    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
    return Base64.encodeBytes(proto.toByteArray());
  }

  /**
   * Converts the given Base64 string back into a Scan instance.
   *
   * @param base64  The scan details.
   * @return The newly created Scan instance.
   * @throws IOException When reading the scan instance fails.
   */
  static Scan convertStringToScan(String base64) throws IOException {
    byte [] decoded = Base64.decode(base64);
    ClientProtos.Scan scan;
    try {
      scan = ClientProtos.Scan.parseFrom(decoded);
    } catch (InvalidProtocolBufferException ipbe) {
      throw new IOException(ipbe);
    }

    return ProtobufUtil.toScan(scan);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job)
  throws IOException {
    initTableReducerJob(table, reducer, job, null);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.
   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
   * default partitioner.
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job,
    Class partitioner) throws IOException {
    initTableReducerJob(table, reducer, job, partitioner, null, null, null);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
   * default partitioner.
   * @param quorumAddress Distant cluster to write to; default is null for
   * output to the cluster that is designated in <code>hbase-site.xml</code>.
   * Set this String to the zookeeper ensemble of an alternate remote cluster
   * when you would have the reduce write a cluster that is other than the
   * default; e.g. copying tables between clusters, the source would be
   * designated by <code>hbase-site.xml</code> and this param would have the
   * ensemble address of the remote cluster.  The format to pass is particular.
   * Pass <code> <hbase.zookeeper.quorum>:<
   *             hbase.zookeeper.client.port>:<zookeeper.znode.parent>
   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
   * @param serverClass redefined hbase.regionserver.class
   * @param serverImpl redefined hbase.regionserver.impl
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job,
    Class partitioner, String quorumAddress, String serverClass,
    String serverImpl) throws IOException {
    initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
        serverClass, serverImpl, true);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @param partitioner  Partitioner to use. Pass <code>null</code> to use
   * default partitioner.
   * @param quorumAddress Distant cluster to write to; default is null for
   * output to the cluster that is designated in <code>hbase-site.xml</code>.
   * Set this String to the zookeeper ensemble of an alternate remote cluster
   * when you would have the reduce write a cluster that is other than the
   * default; e.g. copying tables between clusters, the source would be
   * designated by <code>hbase-site.xml</code> and this param would have the
   * ensemble address of the remote cluster.  The format to pass is particular.
   * Pass <code> <hbase.zookeeper.quorum>:<
   *             hbase.zookeeper.client.port>:<zookeeper.znode.parent>
   * </code> such as <code>server,server2,server3:2181:/hbase</code>.
   * @param serverClass redefined hbase.regionserver.class
   * @param serverImpl redefined hbase.regionserver.impl
   * @param addDependencyJars upload HBase jars and jars for any of the configured
   *           job classes via the distributed cache (tmpjars).
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job,
    Class partitioner, String quorumAddress, String serverClass,
    String serverImpl, boolean addDependencyJars) throws IOException {

    Configuration conf = job.getConfiguration();
    HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
    job.setOutputFormatClass(TableOutputFormat.class);
    if (reducer != null) job.setReducerClass(reducer);
    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
    conf.setStrings("io.serializations", conf.get("io.serializations"),
        MutationSerialization.class.getName(), ResultSerialization.class.getName());
    // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
    if (quorumAddress != null) {
      // Calling this will validate the format
      ZKConfig.validateClusterKey(quorumAddress);
      conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
    }
    if (serverClass != null && serverImpl != null) {
      conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
      conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
    }
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Writable.class);
    if (partitioner == HRegionPartitioner.class) {
      job.setPartitionerClass(HRegionPartitioner.class);
      int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
      if (job.getNumReduceTasks() > regions) {
        job.setNumReduceTasks(regions);
      }
    } else if (partitioner != null) {
      job.setPartitionerClass(partitioner);
    }

    if (addDependencyJars) {
      addDependencyJars(job);
    }

    initCredentials(job);
  }

  /**
   * Ensures that the given number of reduce tasks for the given job
   * configuration does not exceed the number of regions for the given table.
   *
   * @param table  The table to get the region count for.
   * @param job  The current job to adjust.
   * @throws IOException When retrieving the table details fails.
   */
  public static void limitNumReduceTasks(String table, Job job)
  throws IOException {
    int regions =
      MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table));
    if (job.getNumReduceTasks() > regions)
      job.setNumReduceTasks(regions);
  }

  /**
   * Sets the number of reduce tasks for the given job configuration to the
   * number of regions the given table has.
   *
   * @param table  The table to get the region count for.
   * @param job  The current job to adjust.
   * @throws IOException When retrieving the table details fails.
   */
  public static void setNumReduceTasks(String table, Job job)
  throws IOException {
    job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(),
       TableName.valueOf(table)));
  }

  /**
   * Sets the number of rows to return and cache with each scanner iteration.
   * Higher caching values will enable faster mapreduce jobs at the expense of
   * requiring more heap to contain the cached rows.
   *
   * @param job The current job to adjust.
   * @param batchSize The number of rows to return in batch with each scanner
   * iteration.
   */
  public static void setScannerCaching(Job job, int batchSize) {
    job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
  }

  /**
   * Add HBase and its dependencies (only) to the job configuration.
   * <p>
   * This is intended as a low-level API, facilitating code reuse between this
   * class and its mapred counterpart. It also of use to external tools that
   * need to build a MapReduce job that interacts with HBase but want
   * fine-grained control over the jars shipped to the cluster.
   * </p>
   * @param conf The Configuration object to extend with dependencies.
   * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil
   * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a>
   */
  public static void addHBaseDependencyJars(Configuration conf) throws IOException {

    // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar
    // dependencies, MR jobs that write encoded hfiles will fail.
    // We used reflection here so to prevent a circular module dependency.
    // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree.
    Class prefixTreeCodecClass = null;
    try {
      prefixTreeCodecClass =
          Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
    } catch (ClassNotFoundException e) {
      // this will show up in unit tests but should not show in real deployments
      LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." +
          "  Continuing without it.");
    }

    addDependencyJars(conf,
      // explicitly pull a class from each module
      HConstants.class,                      // hbase-common
      ClientProtos.class, // hbase-protocol
      Put.class,                      // hbase-client
      org.apache.hadoop.hbase.CompatibilityFactory.class,            // hbase-hadoop-compat
      TableMapper.class,           // hbase-server
      prefixTreeCodecClass, //  hbase-prefix-tree (if null will be skipped)
      // pull necessary dependencies
      org.apache.zookeeper.ZooKeeper.class,
      io.netty.channel.Channel.class,
      com.google.protobuf.Message.class,
      com.google.common.collect.Lists.class,
      org.apache.htrace.Trace.class,
      com.yammer.metrics.core.MetricsRegistry.class);
  }

  /**
   * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}.
   * Also exposed to shell scripts via `bin/hbase mapredcp`.
   */
  public static String buildDependencyClasspath(Configuration conf) {
    if (conf == null) {
      throw new IllegalArgumentException("Must provide a configuration object.");
    }
    Set<String> paths = new HashSet<String>(conf.getStringCollection("tmpjars"));
    if (paths.size() == 0) {
      throw new IllegalArgumentException("Configuration contains no tmpjars.");
    }
    StringBuilder sb = new StringBuilder();
    for (String s : paths) {
      // entries can take the form 'file:/path/to/file.jar'.
      int idx = s.indexOf(":");
      if (idx != -1) s = s.substring(idx + 1);
      if (sb.length() > 0) sb.append(File.pathSeparator);
      sb.append(s);
    }
    return sb.toString();
  }

  /**
   * Add the HBase dependency jars as well as jars for any of the configured
   * job classes to the job configuration, so that JobClient will ship them
   * to the cluster and add them to the DistributedCache.
   */
  public static void addDependencyJars(Job job) throws IOException {
    addHBaseDependencyJars(job.getConfiguration());
    try {
      addDependencyJars(job.getConfiguration(),
          // when making changes here, consider also mapred.TableMapReduceUtil
          // pull job classes
          job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(),
          job.getInputFormatClass(),
          job.getOutputKeyClass(),
          job.getOutputValueClass(),
          job.getOutputFormatClass(),
          job.getPartitionerClass(),
          job.getCombinerClass());
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    }
  }

  /**
   * Add the jars containing the given classes to the job's configuration
   * such that JobClient will ship them to the cluster and add them to
   * the DistributedCache.
   */
  public static void addDependencyJars(Configuration conf,
      Class<?>... classes) throws IOException {

    FileSystem localFs = FileSystem.getLocal(conf);
    Set<String> jars = new HashSet<String>();
    // Add jars that are already in the tmpjars variable
    jars.addAll(conf.getStringCollection("tmpjars"));

    // add jars as we find them to a map of contents jar name so that we can avoid
    // creating new jars for classes that have already been packaged.
    Map<String, String> packagedClasses = new HashMap<String, String>();

    // Add jars containing the specified classes
    for (Class<?> clazz : classes) {
      if (clazz == null) continue;

      Path path = findOrCreateJar(clazz, localFs, packagedClasses);
      if (path == null) {
        LOG.warn("Could not find jar for class " + clazz +
                 " in order to ship it to the cluster.");
        continue;
      }
      if (!localFs.exists(path)) {
        LOG.warn("Could not validate jar file " + path + " for class "
                 + clazz);
        continue;
      }
      jars.add(path.toString());
    }
    if (jars.isEmpty()) return;

    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()])));
  }

  /**
   * Finds the Jar for a class or creates it if it doesn't exist. If the class is in
   * a directory in the classpath, it creates a Jar on the fly with the
   * contents of the directory and returns the path to that Jar. If a Jar is
   * created, it is created in the system temporary directory. Otherwise,
   * returns an existing jar that contains a class of the same name. Maintains
   * a mapping from jar contents to the tmp jar created.
   * @param my_class the class to find.
   * @param fs the FileSystem with which to qualify the returned path.
   * @param packagedClasses a map of class name to path.
   * @return a jar file that contains the class.
   * @throws IOException
   */
  private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
      Map<String, String> packagedClasses)
  throws IOException {
    // attempt to locate an existing jar for the class.
    String jar = findContainingJar(my_class, packagedClasses);
    if (null == jar || jar.isEmpty()) {
      jar = getJar(my_class);
      updateMap(jar, packagedClasses);
    }

    if (null == jar || jar.isEmpty()) {
      return null;
    }

    LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
    return new Path(jar).makeQualified(fs);
  }

  /**
   * Add entries to <code>packagedClasses</code> corresponding to class files
   * contained in <code>jar</code>.
   * @param jar The jar who's content to list.
   * @param packagedClasses map[class -> jar]
   */
  private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
    if (null == jar || jar.isEmpty()) {
      return;
    }
    ZipFile zip = null;
    try {
      zip = new ZipFile(jar);
      for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
        ZipEntry entry = iter.nextElement();
        if (entry.getName().endsWith("class")) {
          packagedClasses.put(entry.getName(), jar);
        }
      }
    } finally {
      if (null != zip) zip.close();
    }
  }

  /**
   * Find a jar that contains a class of the same name, if any. It will return
   * a jar file, even if that is not the first thing on the class path that
   * has a class with the same name. Looks first on the classpath and then in
   * the <code>packagedClasses</code> map.
   * @param my_class the class to find.
   * @return a jar file that contains the class, or null.
   * @throws IOException
   */
  private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
      throws IOException {
    ClassLoader loader = my_class.getClassLoader();

    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";

    if (loader != null) {
      // first search the classpath
      for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
        URL url = itr.nextElement();
        if ("jar".equals(url.getProtocol())) {
          String toReturn = url.getPath();
          if (toReturn.startsWith("file:")) {
            toReturn = toReturn.substring("file:".length());
          }
          // URLDecoder is a misnamed class, since it actually decodes
          // x-www-form-urlencoded MIME type rather than actual
          // URL encoding (which the file path has). Therefore it would
          // decode +s to ' 's which is incorrect (spaces are actually
          // either unencoded or encoded as "%20"). Replace +s first, so
          // that they are kept sacred during the decoding process.
          toReturn = toReturn.replaceAll("\\+", "%2B");
          toReturn = URLDecoder.decode(toReturn, "UTF-8");
          return toReturn.replaceAll("!.*$", "");
        }
      }
    }

    // now look in any jars we've packaged using JarFinder. Returns null when
    // no jar is found.
    return packagedClasses.get(class_file);
  }

  /**
   * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job
   * configuration contexts (HBASE-8140) and also for testing