1. 程式人生 > >Hbase結合Mapreduce示例

Hbase結合Mapreduce示例

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import
java.io.IOException; import java.util.ArrayList; import java.util.List; /** * mapreduce操作hbase * @author wilson * */ public class HBaseMr { /** * 建立hbase配置 */ static Configuration config = null; static { config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum"
, "192.168.10.71,192.168.10.72,192.168.10.73"); config.set("hbase.zookeeper.property.clientPort", "2181"); } /** * 表資訊 */ public static final String tableName = "word";//表名1 public static final String colf = "content";//列族 public static final String col = "info";//列 public static final String tableName2 = "stat";//表名2 /** * 初始化表結構,及其資料 */ public static void initTB() { HTable table=null; HBaseAdmin admin=null; try { admin = new HBaseAdmin(config);//建立表管理 /*刪除表*/ if (admin.tableExists(tableName)||admin.tableExists(tableName2)) { System.out.println("table is already exists!"); admin.disableTable(tableName); admin.deleteTable(tableName); admin.disableTable(tableName2); admin.deleteTable(tableName2); } /*建立表*/ HTableDescriptor desc = new HTableDescriptor(tableName); HColumnDescriptor family = new HColumnDescriptor(colf); desc.addFamily(family); admin.createTable(desc); HTableDescriptor desc2 = new HTableDescriptor(tableName2); HColumnDescriptor family2 = new HColumnDescriptor(colf); desc2.addFamily(family2); admin.createTable(desc2); /*插入資料*/ table = new HTable(config,tableName); table.setAutoFlush(false); table.setWriteBufferSize(500); List<Put> lp = new ArrayList<Put>(); Put p1 = new Put(Bytes.toBytes("1")); p1.add(colf.getBytes(), col.getBytes(), ("The Apache Hadoop software library is a framework").getBytes()); lp.add(p1); Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes()); lp.add(p2); Put p3 = new Put(Bytes.toBytes("3")); p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes()); lp.add(p3); Put p4 = new Put(Bytes.toBytes("4")); p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes()); lp.add(p4); Put p5 = new Put(Bytes.toBytes("5")); p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes()); lp.add(p5); table.put(lp); table.flushCommits(); lp.clear(); } catch (Exception e) { e.printStackTrace(); } finally { try { if(table!=null){ table.close(); } } catch (IOException e) { e.printStackTrace(); } } } /** * MyMapper 繼承 TableMapper * TableMapper<Text,IntWritable> * Text:輸出的key型別, * IntWritable:輸出的value型別 */ public static class MyMapper extends TableMapper<Text, IntWritable> { private static IntWritable one = new IntWritable(1); private static Text word = new Text(); @Override //輸入的型別為:key:rowKey; value:一行資料的結果集Result protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //獲取一行資料中的colf:col String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表裡面只有一個列族,所以我就直接獲取每一行的值 //按空格分割 String itr[] = words.toString().split(" "); //迴圈輸出word和1 for (int i = 0; i < itr.length; i++) { word.set(itr[i]); context.write(word, one); } } } /** * MyReducer 繼承 TableReducer * TableReducer<Text,IntWritable> * Text:輸入的key型別, * IntWritable:輸入的value型別, * ImmutableBytesWritable:輸出型別,表示rowkey的型別 */ public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //對mapper的資料求和 int sum = 0; for (IntWritable val : values) {//疊加 sum += val.get(); } // 建立put,設定rowkey為單詞 Put put = new Put(Bytes.toBytes(key.toString())); // 封裝資料 put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum))); //寫到hbase,需要指定rowkey、put context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { config.set("df.default.name", "hdfs://192.168.10.71:9000/");//設定hdfs的預設路徑 config.set("hadoop.job.ugi", "hadoop,hadoop");//使用者名稱,組 config.set("mapred.job.tracker", "192.168.10.71:9001");//設定jobtracker在哪 //初始化表 initTB();//初始化表 //建立job Job job = new Job(config, "HBaseMr");//job job.setJarByClass(HBaseMr.class);//主類 //建立scan Scan scan = new Scan(); //可以指定查詢某一列 scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col)); //建立查詢hbase的mapper,設定表名、scan、mapper類、mapper的輸出key、mapper的輸出value TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job); //建立寫入hbase的reducer,指定表名、reducer類、job TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }