MapReduce生成HFile檔案,再使用BulkLoad匯入HBase中(完全分散式執行)
宣告: 若要轉載, 請標明出處.
前提: 在對於大量的資料匯入到HBase中, 如果一條一條進行插入, 則太耗時了, 所以可以先採用MapReduce生成HFile檔案, 然後使用BulkLoad匯入HBase中.
引用:
一、這種方式有很多的優點:
1. 如果我們一次性入庫hbase巨量資料,處理速度慢不說,還特別佔用Region資源, 一個比較高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat類。
2. 它是利用hbase的資料資訊按照特定格式儲存在hdfs內這一原理,直接生成這種hdfs記憶體儲的資料格式檔案,然後上傳至合適位置,即完成巨量資料快速入庫的辦法。配合mapreduce完成,高效便捷,而且不佔用region資源,增添負載。
二、這種方式也有很大的限制:
1. 僅適合初次資料匯入,即表內資料為空,或者每次入庫表內都無資料的情況。
2. HBase叢集與Hadoop叢集為同一叢集,即HBase所基於的HDFS為生成HFile的MR的叢集.
本文程式碼採用Eclipse編輯器(Linux環境下)
一. 網上的大部分程式碼都是或多或少有問題, 比如他們或者不是執行在叢集上,或者執行時有問題, 後面會對產生哪些問題進行說明, 先不說這麼多了,先上程式碼吧.
二. 原始碼(注: 作者親測執行在叢集上成功,叢集基於Ubuntu12.04, Hadoop-1.2.1與HBase-0.98,使用自帶的ZooKeeper)
1. MapReduce生產HFile檔案
首先, 需要匯入的資料的表格(BigClientEnergyInfo表)有四個列族, 每個列族下面有一些列, 這些資訊都使用常量配置類CONSTANT_HADOOP與CONSTANT_HBASE進行說明,如下:
package cn.hey.loaddata2hbase; /** * * @author HeYong * @version 1 * @time 2014-05-09 * */ public class CONSTANT_HADOOP { //大客戶表BigClientEnergyInfo的HFile生成Job名字 public static final String BigClientEnergyInfo_JobName = "BigClientEnergyInfo_HFileGenerator_Job"; //大客戶表BigClientEnergyInfo的輸入原始文字資訊的HDFS路徑 public static final String BigClientEnergyInfo_inDir = "hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/"; //大客戶表BigClientEnergyInfo的HFile檔案的輸出HDFS路徑 public static final String BigClientEnergyInfo_HFile_outDir = "hdfs://node1:49000/user/hadoop/output/BigClientEnergyInfo/"; //說明: 因為在建立HBase表的時候,預設只有一個Region,只有等到這個Region的大小超過一定的閾值之後,才會進行split //所以為了利用完全分散式加快生成HFile和匯入HBase中以及資料負載均衡,所以需要在建立表的時候預先進行分割槽, //而進行分割槽時要利用startKey與endKey進行rowKey區間劃分(因為匯入HBase中,需要rowKey整體有序),所以在匯入之前,自己先寫一個MapReduce的Job求最小與最大的rowKey //即startKey與endKey //獲取最大rowKey與最小rowKey的Job名字 public static final String GetMaxAndMinRowKey_JobName = "GetMaxAndMinRowKey_Job"; //大客戶表BigClientEnergyInfo的輸入原始文字資訊的HDFS路徑 public static final String GetMaxAndMinRowKey_inDir = "hdfs://node1:49000/user/hadoop/input/BigClientEnergyInfo/"; //最大rowKey與最小rowKey的輸出HDFS路徑 public static final String GetMaxAndMinRowKey_outDir = "hdfs://node1:49000/user/hadoop/output/GetMaxAndMinRowKey/"; }
package cn.hey.loaddata2hbase;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.client.HTable;
/**
*
* @author HeYong
* @version 1
* @time 2014-05-09
*
*/
public class CONSTANT_HBASE {
public static final long timeStamp = System.currentTimeMillis();
//表集合
public static List<HTable> htables = new LinkedList<HTable>();
public static final String[] TableNames = {"BigClientEnergyInfo"};
/**
* 大客戶表資訊
*/
//列族資訊
public static final String[] TB0_FamilyNames = {"DateTime","MeterEnergy","ObjInfo","ClientInfo"};
//第1個列族中的列
public static final String[] TB0_FN0ColNames ={"DATETIME"};
//第2個列族中的列
public static final String[] TB0_FN1ColNames ={"DT","OBJ_ID","E0","E1","E2","E3","E4","E5"};
//第3個列族中的列
public static final String[] TB0_FN2ColNames ={"STAT_TYPE","CITY_NO","OBJ_ID","OBJ_NAME","LAYER","LAYER_ID","OBJ_TYPE","TYPE_VALUE",
"TYPE_VALUE_GROUP","SORT","SYS_ID","STATION_NO","FLAG"};
//第4個列族中的列
public static final String[] TB0_FN3ColNames ={"CITY_NO","CONSUMERID","CONSUMERNAME","CUSTOMERTYPE","USERSTATUS","USERADDR","ZONEID","INDUSTRYTYPE",
"LINKMAN","LINKPHONE","USETYPE","LINEID"};
//列族資訊集合
public static final String[][] TB0_FNColNames={TB0_FN0ColNames,TB0_FN1ColNames,TB0_FN2ColNames,TB0_FN3ColNames};
//每個列族的列索引
public static final int[] FNColIndex={1,2,10,23};
}
接著, 使用建立一個生成四個列族的HFile的MapRed Job,每個列族一個Job, 原始碼如下(類BigClientEnergyInfoHFileGenerator):
其中有三點需要特別注意:
(1)
//特別注意: 一定要設定,不然會報cannot read partitioner file錯誤
conf.set("fs.default.name","node1:49000");
(2) //特別注意: 一定要設定,不然不會執行在叢集上
conf.set("mapred.job.tracker","node1:49001");
(3) //特別注意: 對相關Class檔案以及依賴的jar包(如HBase的jar,)進行打包,這是執行在叢集上必須要做的一步,不然叢集找不到相關的Mapper等類檔案
File jarpath;
try {
jarpath = JarTools.makeJar("bin");
conf.set("mapred.jar", jarpath.toString());
} catch (Exception e) {
logger.error("進行jar打包出錯!");
e.printStackTrace();
return;
}
特別注意: 因為我這裡是對工程下的bin目錄裡面的內容進行打包,所以需要把依賴的jar包先放入bin資料夾中, 再Bulid Path->Add to Build Path, 不然會出現在執行時, 依賴的包中的類找不到, 如HBase包中的ImmutableBytesWritable類等. 當然你也可以放在別的目錄下,然後進行打包, 反正需要將相關Class檔案與依賴的jar包進行打包. 這裡自己寫了一個JarTools類進行對指定資料夾下面的內容進行打包package cn.hey.loaddata2hbase;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import cn.hey.file.FileOperation;
import cn.hey.hbase.HbaseOperation;
import cn.hey.utils.JarTools;
/**
*
* @author HeYong
* @version 1
* @time 2014-05-09
*
*/
public class BigClientEnergyInfoHFileGenerator {
public static Logger logger = LogManager.getLogger(BigClientEnergyInfoHFileGenerator.class);
/**
*
* @param args 第一個元素表示第幾個表,第二個元素表示該表的列族個數
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
* @throws Exception
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, Exception{
if(args.length<2){
logger.error("引數個數不對!");
return;
}
int tableIndex = Integer.parseInt(args[0]);
int familyNum = Integer.parseInt(args[1]);
int index = 0;
long beginTime=0,endTime=0;
while(index<familyNum){
beginTime = System.currentTimeMillis();
GeneratorJob(tableIndex,index);
endTime = System.currentTimeMillis();
FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n");
++index;
}
FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime1.txt","-----------------------------");
}
public static class HFileGenerateMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
private static int familyIndex = 0;
private static Configuration conf = null;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
conf = context.getConfiguration();
familyIndex = conf.getInt("familyIndex",0);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
value.toString().split(",")[0].getBytes());
List<KeyValue> list = null;
list = createKeyValue(value.toString());
Iterator<KeyValue> it = list.iterator();
while (it.hasNext()) {
KeyValue kv = new KeyValue();
kv = it.next();
if (kv != null) {
context.write(rowkey, kv);
}
}
}
private List<KeyValue> createKeyValue(String str) {
List<KeyValue> list = new ArrayList<KeyValue>(CONSTANT_HBASE.TB0_FNColNames[familyIndex].length);
String[] values = str.toString().split(",");
String[] qualifiersName = CONSTANT_HBASE.TB0_FNColNames[familyIndex];
for (int i = 0; i < qualifiersName.length; i++) {
String rowkey = values[0];
String family = CONSTANT_HBASE.TB0_FamilyNames[familyIndex];
String qualifier = qualifiersName[i];
String value_str = values[i+CONSTANT_HBASE.FNColIndex[familyIndex]];
KeyValue kv = new KeyValue(Bytes.toBytes(rowkey),
Bytes.toBytes(family), Bytes.toBytes(qualifier),
CONSTANT_HBASE.timeStamp, Bytes.toBytes(value_str));
list.add(kv);
}
return list;
}
}
//測試Mapper,用來進行測試的, 後面沒有用到
public static class HFileMapper extends Mapper<LongWritable, Text,ImmutableBytesWritable,KeyValue> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(" ", -1);
byte[] rkey = Bytes.toBytes(values[0]); //rowkey
byte[] family = Bytes.toBytes("info"); //列族
byte[] column = Bytes.toBytes("name"); //列
byte[] val = Bytes.toBytes(values[1]); //值
//Put tmpPut=new Put(subject);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(rkey);
KeyValue kvProtocol = new KeyValue(rkey , family, column, val);
context.write(rowKey, kvProtocol );
}
}
/**
*
* @param tableIndex 表示第幾個表(從0開始),具體參見CONSTANT_HBASE類
* @param familyIndex 表示該表的第幾個列族(從0開始),具體參見CONSTANT_HBASE類
* @throws IOException
*/
public static void GeneratorJob(int tableIndex,int familyIndex) throws IOException{
Configuration conf = HbaseOperation.HBASE_CONFIG;
//特別注意: 一定要設定,不然會爆cannot read partitioner file錯誤
conf.set("fs.default.name","node1:49000");
//特別注意: 一定要設定,不然不會執行在叢集上
conf.set("mapred.job.tracker","node1:49001");
//特別注意: 對相關Class以及依賴的jar包(如HBase的jar)進行打包,這是執行在叢集上必須要做的一步,不然叢集找不到相關的Mapper等類檔案
File jarpath;
try {
jarpath = JarTools.makeJar("bin");
conf.set("mapred.jar", jarpath.toString());
} catch (Exception e) {
logger.error("進行jar打包出錯!");
e.printStackTrace();
return;
}
//設定job
Job job = new Job(conf, CONSTANT_HADOOP.BigClientEnergyInfo_JobName);
job.setJarByClass(BigClientEnergyInfoHFileGenerator.class);
//設定Map任務輸出Key-Value型別,一定要為該型別,Value可以改為HBase的Put型別
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
//設定Mapper與Reducer類
job.setMapperClass(HFileGenerateMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
// 不需要設定,系統會根據相關資訊呼叫 HFileOutputFormat
// job.setOutputFormatClass(HFileOutputFormat.class);
// 不需要設定, 系統會根據表的Region數建立多少Reducer
// job.setNumReduceTasks(4);
// job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);
HTable table = new HTable(conf, CONSTANT_HBASE.TableNames[tableIndex]);
HFileOutputFormat.configureIncrementalLoad(job, table);
//設定資料輸入輸出目錄
String str_inPath = CONSTANT_HADOOP.BigClientEnergyInfo_inDir;
String str_outPath = CONSTANT_HADOOP.BigClientEnergyInfo_HFile_outDir+CONSTANT_HBASE.TB0_FamilyNames[familyIndex];
//建立HDFS物件
FileSystem fs = FileSystem.get(URI.create(str_inPath),conf);
// 如果輸出路徑存在就先刪掉,因為不允許輸出路徑事先存在
Path outPath = new Path(str_outPath);
if (fs.exists(outPath))
fs.delete(outPath, true);
FileInputFormat.addInputPath(job, new Path(str_inPath));
FileOutputFormat.setOutputPath(job, new Path(str_outPath));
try {
job.waitForCompletion(true);
} catch (InterruptedException e) {
logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+" 任務執行出錯!");
e.printStackTrace();
} catch (ClassNotFoundException e) {
logger.info(CONSTANT_HADOOP.BigClientEnergyInfo_JobName+" 任務執行出錯!");
e.printStackTrace();
}
}
}
生成HFile程式說明:
①. 最終輸出結果,無論是map還是reduce,輸出部分key和value的型別必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
②. 最終輸出部分,Value型別是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer。
③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile檔案。好像最新的版本可以多個列族.
④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自動對job進行配置。TotalOrderPartitioner是需要先對key進行整體排序,然後劃分到每個reduce中,保證每一個reducer中的的key最小最大值區間範圍,是不會有交集的。因為入庫到HBase的時候,作為一個整體的Region,key是絕對有序的。
⑤. MR例子中最後生成HFile儲存在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。
然後, 使用BulkLoad工具將HFile檔案匯入HBase中, 原始碼如下(類BigClientEnergyInfoHFileLoader):
package cn.hey.loaddata2hbase;
import java.io.File;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import cn.hey.file.FileOperation;
import cn.hey.hbase.HbaseOperation;
/**
*
* @author HeYong
* @version 1
* @time 2014-05-09
*
*/
public class BigClientEnergyInfoHFileLoader {
public static Logger logger = LogManager.getLogger(HFileLoader.class);
public static void main(String[] args) throws Exception {
if(args.length<2){
logger.error("引數個數不對!");
return;
}
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
HbaseOperation.HBASE_CONFIG);
int tableIndex = Integer.parseInt(args[0]);
int familyNum = Integer.parseInt(args[1]);
int i = 0;
long beginTime=0,endTime=0;
while(i<familyNum){
beginTime = System.currentTimeMillis();
String str_outPath = CONSTANT_HADOOP.str_outPath+CONSTANT_HBASE.TB0_FamilyNames[i];
loader.doBulkLoad(new Path(str_outPath),CONSTANT_HBASE.htables.get(tableIndex));
endTime = System.currentTimeMillis();
//將用時相關寫入檔案
FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt",(((endTime-beginTime)/(1.0*60*1000)))+"\n");
++i;
}
FileOperation.append2File(System.getProperty("user.dir")+File.separator+"file"+File.separator+"runTime2.txt","------------------------");
}
}
最後,使用一個Driver類, 先建立HTable,然後呼叫上面的兩個類,原始碼如下(類BigClientEnergyInfoLoadDriver):
說明: 因為在建立HBase表的時候,預設只有一個Region,只有等到這個Region的大小超過一定的閾值之後,才會進行split, 所以為了利用完全分散式加快生成HFile和匯入HBase中以及資料負載均衡,所以需要在建立表的時候預先建立分割槽,可以查閱相關資料(關於HBase調優的資料), 而進行分割槽時要利用startKey與endKey進行rowKey區間劃分(因為匯入HBase中,需要rowKey整體有序),所以在匯入之前,自己先寫一個MapReduce的Job求最小與最大的rowKey, 即startKey與endKey.
package cn.hey.loaddata2hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import cn.hey.hbase.HbaseOperation;
import cn.hey.hdfs.HDFSOperation;
/**
*
* @author HeYong
* @version 1
* @time 2014-05-09
*
*/
public class BigClientEnergyInfoLoadDriver {
protected static Logger logger = LogManager.getLogger(BigClientEnergyInfoLoadDriver.class);
/**
* @param args
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//首先刪除在CONSTANT_HBASE類中的第0個表,即BigClientEnergyInfo表
dropHTable(0);
/**
* 說明: 因為在建立HBase表的時候,預設只有一個Region,只有等到這個Region的大小超過一定的閾值之後,才會進行split,
* 所以為了利用完全分散式加快生成HFile和匯入HBase中以及資料負載均衡,所以需要在建立表的時候預先建立分割槽,可以查閱相關資料(關於HBase調優的資料),
* 而進行分割槽時要利用startKey與endKey進行rowKey區間劃分(因為匯入HBase中,需要rowKey整體有序),所以在匯入之前,自己先寫一個MapReduce的Job求最小與最大的rowKey,
* 即startKey與endKey.
*
*/
//呼叫GetMaxAndMinRowKeyDriver.獲取startKey與endKey
GetMaxAndMinRowKeyDriver.main(null);
//讀取startKey與endKey,readHDFSFile方法即讀取指定HDFS檔案中的內容,每一行作為一個字串
List<String> strList = HDFSOperation.readHDFSFile(CONSTANT_HADOOP.GetMaxAndMinRowKey_outDir+"part-r-00000");
if(strList==null||strList.size()<2){
logger.info("startKey與endKey讀取失敗!");
return;
}
String startKey = strList.get(0);
String endKey = strList.get(1);
if(startKey==null||"".equals(startKey)||endKey==null||"".equals(endKey)){
logger.info("startKey或endKey為空!");
return;
}
args = new String[2];
//第0個表,表的索引,即表BigClientEnergyInfo
args[0]="0";
//該表所擁有的列族的數目
args[1]= ""+CONSTANT_HBASE.TB0_FamilyNames.length;
//建立第0個表,即大客戶表
boolean flag = false;
try {
//建立表時預先建立的Region個數
int numPreRegions = 7;
flag = createHTable(0,startKey,endKey,numPreRegions);
} catch (IOException e1) {
e1.printStackTrace();
}
if(flag){
//產生該表的HFile檔案
try {
BigClientEnergyInfoHFileGenerator.main(args);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
//將HFile匯入HBase中
try {
HFileLoader.main(args);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* @param index 第幾個表
* @param startKey 建立預先分割槽的startKey
* @param endKey 建立預先分割槽的endKey
* @param numRegions 建立預先分割槽個數
* @return 是否建立成功
* @throws IOException
*/
public static boolean createHTable(int index,String startKey,String endKey,int numRegions) throws IOException{
if(index<0||index>=CONSTANT_HBASE.TableNames.length){
logger.error("表下標越界!");
return false;
}
if(startKey==null||"".equals(startKey)){
logger.error("startKey不能為空!");
return false;
}
if(endKey==null||"".equals(endKey)){
logger.error("endKey不能為空!");
return false;
}
if(numRegions<0){
logger.error("分割槽個數<0!");
return false;
}
List<String> list = new ArrayList<String>();
String tableName = CONSTANT_HBASE.TableNames[index];
for(String familyName:CONSTANT_HBASE.TB0_FamilyNames){
list.add(familyName);
}
if(HbaseOperation.createTable(tableName, list,startKey,endKey,numRegions)){
logger.info("建立HTable :"+tableName+"成功");
}
HTable table = new HTable(HbaseOperation.HBASE_CONFIG,tableName);
CONSTANT_HBASE.htables.add(table);
return true;
}
public static void dropHTable(int index){
String tableName = CONSTANT_HBASE.TableNames[index];
HbaseOperation.dropTable(tableName);
}
}
注: HbaseOperation.createTable方法, 即建立表, HbaseOperation.dropTable方法,即刪除表, 原始碼如下:
/**
* 建立表
*
* @param tableName
* @param family 列族集名稱
* @param String startKey,String endKey,int numRegions 預先分割槽相關資訊
*/
public static boolean createTable(String tableName,List<String> family,String startKey,String endKey,int numRegions) {
try {
hBaseAdmin = new HBaseAdmin(HBASE_CONFIG);
//如果表已存在,則返回
if (hBaseAdmin.tableExists(tableName)) {
//hBaseAdmin.disableTable(tableName);
//hBaseAdmin.deleteTable(tableName);
logger.info("表: "+tableName+"已經存在!");
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
for(String name:family){
tableDescriptor.addFamily(new HColumnDescriptor(name));
}
hBaseAdmin.createTable(tableDescriptor,Bytes.toBytes(startKey),Bytes.toBytes(endKey),numRegions);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 刪除一張表
*
* @param tableName 表名
*/
public static void dropTable(String tableName) {
if(tableName==null||"".equals(tableName)){
logger.error("表名不能為空!");
return;
}
try {
hBaseAdmin = new HBaseAdmin(HBASE_CONFIG);
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
特別注意: 對HBase進行操作時, 在獲取HBase conf時, 即public static Configuration HBASE_CONFIG = HBaseConfiguration.create();的時候, 一定要進行如下設定:
static {
//設定HMaster
HBASE_CONFIG.set("hbase.zookeeper.master","node1:60000");
//設定Zookeeper叢集
HBASE_CONFIG.set("hbase.zookeeper.quorum", "node2,node3,node4,node5,node6,node7,node8");
}
不然會出現RegionServer的Zookeeper連線不上HMaster, 千萬要注意.
到這裡就基本大功告成了. 可以通過node1:50030檢視job的執行情況, 通過node1:60010檢視HBase的相關情況.
下一篇將講述中間遇到的問題以及解決辦法.