1. 程式人生 > >HBASE--MapReduce

HBASE--MapReduce

instance 自定義 byte ear tin 類型 util 數量 命令

1、查看 HBase 的 MapReduce 任務的執行

$ bin/hbase mapredcp

2、執行環境變量的導入

$ export HBASE_HOME= ~/hadoop_home/hbase-1.2.6
$ export HADOOP_HOME= ~/hadoop_home
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
本句的環境變量可以不用加入 ~/.profile裏面

不過要在哪個shell裏執行哪個才能用jar包

3) 運行官方的 MapReduce 任務
-- 案例一:統計 Student 表中有多少行數據
cd /home/hadoop/hadoop_home/hbase-1.2.6

(文件夾路徑環境變量,只是使用沒有配置,所以進入此路徑下)

$ yarn jar lib/hbase-server-1.2.6.jar rowcounter student

案例二:使用 MapReduce 將本地數據導入到 HBase
(1) 在本地創建一個 tsv 格式的文件:fruit.tsv

1001 Apple Red
1002 Pear Yellow
1003 PineappleYellow

尖叫提示:上面的這個數據不要從文中直接復制,有格式錯誤
(2)

創建 HBase 表

hbase(main):001:0> create ‘fruit‘,‘info
(3) 在 HDFS 中創建 input_fruit 文件夾並上傳 fruit.tsv 文件

$ hdfs dfs -mkdir /input_fruit/
$ hdfs dfs -put fruit.tsv /input_fruit/
(4) 執行 MapReduce 到 HBase 的 fruit 表中
cd /home/hadoop/hadoop_home/hbase-1.2.6

$ yarn jar lib/hbase-server-1.2.6.jar importtsv
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit
hdfs://master:9000/input_fruit
(5) 使用 scan 命令查看導入後的結果

hbase(main):001:0> scan ‘fruit


2.5.2、自定義 HBase-MapReduce1
目標:將 fruit 表中的一部分數據,通過 MR 遷入到 fruit_mr 表中。
分步實現:
1) 構建 ReadFruitMapper 類,用於讀取 fruit 表中的數據
package com.yjsj.hbase_mr;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
  @Override
  protected void map(ImmutableBytesWritable key, Result value, Context context)
  throws IOException, InterruptedException {
    //將 fruit 的 name 和 color 提取出來,相當於將每一行數據讀取出來放入到 Put 對象中。
    Put put = new Put(key.get());
    //遍歷添加 column 行
    for(Cell cell: value.rawCells()){
      //添加/克隆列族:info
      if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
        //添加/克隆列:name
        if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
          //將該列 cell 加入到 put 對象中
          put.add(cell);
        //添加/克隆列:color
        }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
          //向該列 cell 加入到 put 對象中
          put.add(cell);
        }
      }
    }

    //將從 fruit 讀取到的每行數據寫入到 context 中作為 map 的輸出
    context.write(key, put);
  }
}

2) 構建 WriteFruitMRReducer 類,用於將讀取到的 fruit 表中的數據寫入到 fruit_mr 表中

package com.yjsj.hbase_mr;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put,
NullWritable> {
  @Override
  protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
  throws IOException, InterruptedException {
    //讀出來的每一行數據寫入到 fruit_mr 表中
    for(Put put: values){
      context.write(NullWritable.get(), put);
    }
  }
}

3) 構建 Fruit2FruitMRRunner extends Configured implements Tool 用於組裝運行 Job
任務
class Txt2FruitRunner extends Configured implements Tool {

  //組裝 Job
  public int run(String[] args) throws Exception {

    //得到 Configuration
    Configuration conf = this.getConf();
    //創建 Job 任務
    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(Fruit2FruitMRRunner.class);
    //配置 Job
    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setCaching(500);
    //設置 Mapper,註意導入的是 mapreduce 包下的,不是 mapred 包下的,後者是老
    版本
    TableMapReduceUtil.initTableMapperJob(
      "fruit", //數據源的表名
      scan, //scan 掃描控制器
      ReadFruitMapper.class,//設置 Mapper 類
      ImmutableBytesWritable.class,//設置 Mapper 輸出 key 類型
      Put.class,//設置 Mapper 輸出 value 值類型
      job//設置給哪個 JOB
    );
    //設置 Reducer
    TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class,
    job);
    //設置 Reduce 數量,最少 1 個
    job.setNumReduceTasks(1);
    boolean isSuccess = job.waitForCompletion(true);

    if(!isSuccess){
      throw new IOException("Job running with error");
    }
    return isSuccess ? 0 : 1;
  }

4) 主函數中調用運行該 Job 任務

public static void main( String[] args ) throws Exception{
  Configuration conf = HBaseConfiguration.create();


  conf = HBaseConfiguration.create();
  conf.set("hbase.zookeeper.quorum", "master,node1,node2");
  conf.set("hbase.zookeeper.property.clientPort", "2181");
  conf.set("hbase.master", "master:60000");


  int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
  System.exit(status);
}

:加入zookeeper代碼
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.master", "master:60000");

5) 打包運行任務(參考java打包,而且打包需要zip 格式一下)

$ ~/hadoop_home/bin/yarn jar ~/hadoop_home/TestHbase.jar
com.yjsj.hbase.mr1.Fruit2FruitMRRunner

尖叫提示:運行任務前,如果待數據導入的表不存在,則需要提前創建之。
尖叫提示:maven 打包命令:-P local clean package 或-P dev clean package install(將第三方
jar 包一同打包,需要插件:maven-shade-plugin)

2.5.3、自定義HBase-MapReduce2
目標:實現將 HDFS 中的數據寫入到 HBase 表中
分步實現:
1) 構建 ReadFruitFromHDFSMapper 於讀取 HDFS 中的文件數據

package com.yjsj.hbase_mr2;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text,
ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
//從 HDFS 中讀取的數據
String lineValue = value.toString();
//讀取出來的每行數據使用\t 進行分割,存於 String 數組
String[] values = lineValue.split("\t");
//根據數據中值的含義取值
String rowKey = values[0];
String name = values[1];
String color = values[2];
//初始化 rowKey
ImmutableBytesWritable rowKeyWritable = new
ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化 put 對象
Put put = new Put(Bytes.toBytes(rowKey));
//參數分別:列族、列、值

put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put);
}
}

2) 構建 WriteFruitMRFromTxtReducer 類

package com.yjsj.hbase_mr2;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put,
NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//讀出來的每一行數據寫入到 fruit_hdfs 表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}

3) 創建 Txt2FruitRunner 組裝 Job

class Txt2FruitRunner extends Configured implements Tool {

public int run(String[] args) throws Exception {
//得到 Configuration
Configuration conf = this.getConf();
//創建 Job 任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://master:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);
//設置 Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//設置 Reducer
TableMapReduceUtil.initTableReducerJob("fruit_hdfs", WriteFruitMRFromTxtReducer.class, job);
//設置 Reduce 數量,最少 1 個
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}

4) 調用執行 Job

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();

conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.master", "master:60000");


int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
System.exit(status);
}

註:
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("hbase.master", "master:60000");

5) 打包運行(參考java打包,而且打包需要zip 格式一下)

$ ~/hadoop_home/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar
com.z.hbase.mr2.Txt2FruitRunner

尖叫提示:運行任務前,如果待數據導入的表不存在,則需要提前創建之。
尖叫提示:maven 打包命令:-P local clean package 或-P dev clean package install(將第三方
jar 包一同打包,需要插件:maven-shade-plugin)

2.6、與 Hive 的集成
2.6.1、HBase 與 Hive 的對比
1) Hive
(1) 數據倉庫
Hive 的本質其實就相當於將 HDFS 中已經存儲的文件在 Mysql 中做了一個雙射關系,以方
便使用 HQL 去管理查詢。
(2) 用於數據分析、清洗
Hive 適用於離線的數據分析和清洗,延遲較高。
(3) 基於 HDFS、MapReduce
Hive 存儲的數據依舊在 DataNode 上,編寫的 HQL 語句終將是轉換為 MapReduce 代碼執行。
2) HBase
(1) 數據庫
是一種面向列存儲的非關系型數據庫。
(2) 用於存儲結構化和非結構話的數據 適用於單表非關系型數據的存儲,不適合做關聯查

(3) 基於 HDFS
數據持久化存儲的體現形式是 Hfile,存放於 DataNode 中,被 ResionServer 以 region 的形式
進行管理。
(4) 延遲較低,接入在線業務使用
面對大量的企業數據,HBase 可以直線單表大量數據的存儲,同時提供了高效的數據訪問速
度。
2.6.2、HBase 與 Hive 集成使用
尖叫提示:HBase 與 Hive 的集成在最新的兩個版本中無法兼容。所以,我們只能含著淚勇
敢的重新編譯:hive-hbase-handler-1.2.2.jar!!好氣!!
環境準備
因為我們後續可能會在操作 Hive 的同時對 HBase 也會產生影響,所以 Hive 需要持有操作
HBase 的 Jar,那麽接下來拷貝 Hive 所依賴的 Jar 包(或者使用軟連接的形式)。



HBASE--MapReduce