MapReduce將HDFS文字資料匯入HBase中
HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式:
- 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase
- 另一種方式就是使用HBase原生Client API
本文就是示範如何通過MapReduce作業從一個檔案讀取資料並寫入到HBase中。
首先啟動Hadoop與HBase,然後建立一個空表,用於後面匯入資料:
hbase(main):006:0> create 'mytable','cf'
0 row(s) in 10.8310 seconds
=> Hbase::Table - mytable
hbase(main):007 :0> list
TABLE
mytable
1 row(s) in 0.1220 seconds
=> ["mytable"]
hbase(main):008:0> scan 'mytable'
ROW COLUMN+CELL
0 row(s) in 0.2130 seconds
一、示例程式
下面的示例程式通過TableOutputFormat
將HDFS上具有一定格式的文字資料匯入到HBase中。
首先建立MapReduce作業,目錄結構如下:
Hdfs2HBase/
├── classes
└── src
├── Hdfs2HBase.java
├── Hdfs2HBaseMapper.java
└── Hdfs2HBaseReducer.java
Hdfs2HBaseMapper.java
package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Hdfs2HBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text line, Context context) throws IOException,InterruptedException {
String lineStr = line.toString();
int index = lineStr.indexOf(":");
String rowkey = lineStr.substring(0, index);
String left = lineStr.substring(index+1);
context.write(new Text(rowkey), new Text(left));
}
}
Hdfs2HBaseReducer.java
package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class Hdfs2HBaseReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
String k = rowkey.toString();
for(Text val : value) {
Put put = new Put(k.getBytes());
String[] strs = val.toString().split(":");
String family = strs[0];
String qualifier = strs[1];
String v = strs[2];
put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
context.write(new ImmutableBytesWritable(k.getBytes()), put);
}
}
}
Hdfs2HBase.java
package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hdfs2HBase {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <infile> <table>");
System.exit(2);
}
Job job = new Job(conf, "hdfs2hbase");
job.setJarByClass(Hdfs2HBase.class);
job.setMapperClass(Hdfs2HBaseMapper.class);
job.setReducerClass(Hdfs2HBaseReducer.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);
System.exit(job.waitForCompletion(true)?0:1);
}
}
配置javac
編譯依賴環境:
$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar
這裡要操作HBase,故除了上面三個jar包,還需要$HBASE_HOME/lib
目錄下的jar包。為了方便,我們在/etc/profile
的CLASSPATH
裡包含所有的依賴包:
TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS
編譯
$ javac -d classes/ src/*.java
打包
$ jar -cvf hdfs2hbase.jar classes
執行
建立一個data.txt
檔案,內容如下(列族是建表時建立的列族cf
):
r1:cf:c1:value1
r2:cf:c2:value2
r3:cf:c3:value3
將檔案複製到hdfs上:
$ hadoop/bin/hadoop fs -put data.txt /hbase
執行MapReduce作業:
$ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable
報錯NoClassDefFoundError
找不到類定義:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
原因是我沒有把HBase的jar包加到hadoop-env.sh
中。
TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
HADOOP_CLASSPATH=$HBASE_JARS
再次執行發現又報了Unable to initialize MapOutputCollector
的錯誤:
15/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
...
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
java.lang.Exception: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0
原因是我沒有指明Map輸出的Key/Value型別,在Hdfs2HBase.java
中新增以下兩句:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
如果沒有專門定義Mapper輸出型別的話,job.setOutputKeyClass
和job.setOutputValueClass
設定的是Mapper和Reducer兩個的輸出型別。
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
而Hdfs2HBaseMapper輸出型別是Text/Text,所以這裡需要單獨指定。
修改Hdfs2HBase.java
package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hdfs2HBase {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <infile> <table>");
System.exit(2);
}
Job job = new Job(conf, "hdfs2hbase");
job.setJarByClass(Hdfs2HBase.class);
job.setMapperClass(Hdfs2HBaseMapper.class);
job.setReducerClass(Hdfs2HBaseReducer.class);
job.setMapOutputKeyClass(Text.class); // +
job.setMapOutputValueClass(Text.class); // +
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);
System.exit(job.waitForCompletion(true)?0:1);
}
}
再次編譯、打包,然後執行成功!
查詢HBase表,驗證資料是否已匯入:
hbase(main):001:0> scan 'mytable'
ROW COLUMN+CELL
r1 column=cf:c1, timestamp=1439223857492, value=value1
r2 column=cf:c2, timestamp=1439223857492, value=value2
r3 column=cf:c3, timestamp=1439223857492, value=value3
3 row(s) in 1.3820 seconds
可以看到,資料匯入成功!
由於需要頻繁的與儲存資料的RegionServer通訊,佔用資源較大,一次性入庫大量資料時,TableOutputFormat效率並不好。
二、拓展-TableReducer
我們可以將Hdfs2HBaseReducer.java
程式碼改成下面這樣,作用是一樣的:
package com.lisong.hdfs2hbase;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class Hdfs2HBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
String k = rowkey.toString();
for(Text val : value) {
Put put = new Put(k.getBytes());
String[] strs = val.toString().split(":");
String family = strs[0];
String qualifier = strs[1];
String v = strs[2];
put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
context.write(new ImmutableBytesWritable(k.getBytes()), put);
}
}
}
這裡直接繼承了TableReducer
,TableReducer
是部分特例化的Reducer
,它只有三個型別引數:輸入Key/Value是對應Mapper的輸出,輸出Key可以是任意的型別,但是輸出Value必須是一個Put
或Delete
例項。
編譯打包執行,結果與前面的一樣!
相關推薦
MapReduce將HDFS文字資料匯入HBase中
HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式: 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase 另一種方式就是使用HBase原生Client API 本文就是示範如何通過M
將sqlserver的資料匯入hbase中
將sqlserver的資料匯入hbase中 1.解壓sqoop-sqlserver-1.0.tar.gz,並改名(可以不改) tar -zxvf sqoop- sql
如何將不同型別資料匯入Elaticsearch中?
題記 Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql
33.如何將不同型別資料匯入Elaticsearch中(ES同步小結)
題記Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql、oracle等關係型資料
文字資料匯入HBASE
在將有定界符文字檔案匯入HBASE庫中,需要將後面的定界符去掉,否則將匯入失敗。如下所示:[[email protected] bin]$ cat /tmp/emp.txt1,A,201304,2,B,201305,3,C,201306,4,D,201307,這個
文字檔案匯入HBase中
文字檔案匯入到Hbase中 建立表sudo su - su - hadoop ./hbase shellcreate 'table1',{NAME => 'DF', VERSIONS => 5} www.2cto.com 配置環境 1.修改hadoop環
使用mapreduce 將hdfs中的資料匯入到到hbase 中
package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase
使用命令將文字資料匯入到資料庫中
1.下載 oracle 客戶端 和 plsql Oracle 的下載地址: 2. 建立 load.ctl 檔案 在任意資料夾下建立 load.ctl 檔案,用編輯器開啟 load.ctl 檔
通過sqoop將MySQL資料庫中的資料匯入Hbase
從接觸到大資料到成功的實現一個功能期間走了不少彎路也踩了不少坑,這裡作為我的學習筆記也可以作為小白們的前車之鑑,少走彎路,有不正確之處,望指出 環境準備: hadoop、hbase、sqoop、mys
hive over hbase方式將文字庫資料匯入hbase
1,建立hbase表Corpus >> create 'Corpus','CF' 2,建立hive->hbase外表logic_Corpus,並對應hbase中的Corpus表 >> CREATE EXTERNAL TABLE logic_Co
利用sqoop將hive資料匯入Oracle中(踩的坑)
教程很多,這裡只說踩過的坑 1.下載sqoop時,還得下一個bin的包,拿到sqoop-1.4.6.jar 的包,放到hadoop的lib目錄下 2.匯入oracle,執行程式碼時,使用者名稱和表名必須大寫!且資料庫建表時表名必須大寫! 示例程式碼: sqoop expo
mysql匯入資料load data infile用法(將txt檔案中的資料匯入表中)
我們常常匯入資料!mysql有一個高效匯入方法,那就是load data infile 下面來看案例說明 基本語法: load data [low_priority] [local] infile 'file_name txt' [replace | ignor
flume將kafka中topic資料匯入hive中
一、首先更加資料的表結構在hive中進行表的建立。 create table AREA1(unid string,area_punid string,area_no string,area_name s
oracle通過load data 將資料匯入表中通過儲存過程進行批量處理
說明:雖然沒圖,但文字表述很清楚,自己做過的專案留著備用(這只是初版,比較繁瑣,但很明確) 準備工作做完之後,後期可直接使用。如後期excel資料有變更,只需改動對應的部分即可,不涉及改動的可直接使用。 實際操作步驟 依照excel資料模版格式準備好建表語句,將中間過渡
使用PLSQL將文字資料匯入表
開啟PLSQL,點選“Tools”-->“Text Importer...”,點選下圖的按鈕開啟所要匯入的檔案 匯入檔案之後,在下方進行一些列的設定之後,點選上圖中的“Data to Oracle” 點選“Data to Oracle”之後的操作如下圖所示 &
利用sqoop將hive資料匯入Oracle中
首先: 如oracle則執行sqoop list-databases --connect jdbc:oracle:thin:@//192.168.27.235:1521/ORCL --username DATACENTER -P 來測試是否能正確連線資料庫 如mysql則執行sq
Hive 實戰練習(一)—按照日期將每天的資料匯入Hive表中
需求: 每天會產生很多的日誌檔案資料,有這麼一種需求:需要將每天產生的日誌資料在晚上12點鐘過後定時執行操作,匯入到Hive表中供第二天資料分析使用。要求建立分割槽表,並按照日期分割槽。資料檔案命名是以當天日期命名的,如2015-01-09.txt一、建立分割
將模板word中的特定欄位替換(將資料匯入word中)
一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 開發程式碼 /** * @Title createContract * @description 生成合
將Excel的資料匯入SqlServer的表中
記錄一下最近從Excel匯入大量資料到SqlServer表中的步驟。 在將Excel資料準備好以後。 1、右鍵SQL Server中需要匯入資料的庫名,選擇【任務】—【匯入資料】如圖: 2、彈
用sqoop將mysql的資料匯入到hive表中,原理分析
Sqoop 將 Mysql 的資料匯入到 Hive 中 準備Mysql 資料 如圖所示,準備一張表,資料隨便造一些,當然我這裡的資料很簡單。 編寫命令 編寫引數檔案 個人習慣問題,我喜歡把引數寫到檔案裡,然後再命令列引用。 vim mysql-info, #