1. 程式人生 > >Hbase資料解析mapreduce過程及遇到的問題

Hbase資料解析mapreduce過程及遇到的問題

本次介紹的是將hbase中存的資料,用mapreduce解析。
一,hbase中的資料結構:
這裡寫圖片描述
二,分析map過程:
因為這裡是對hbase中的表進行解析,所以map必須是繼承TableMapper類來對資料進行解析。
注意:輸入的key value必須是ImmutableBytesWritable和Result。輸出的就可以自己定義了。
ImmutableBytesWritable,指代的是行健,
Result,指代的是值。
在這個map過程中會將tags,用“,”分割得到一個字元陣列,然後同nikname一起寫出,nakname做key,tag做value。
三,分析Reduce過程:
這裡的Reduce過程跟平時的沒有太大的區別,在整合資料時,也是用“,”分割的。
最後注意:
map過程的輸出值,與reduce的輸入值,一定要對應,並且value不能為空,否則reduce過程會進不去。開始我的就是key有,但是value空了,導致reduce過程一直不進去。
這裡還提供了hbase表建立的方法testData()。

四,程式碼:

mport java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class YaoHbaseAndMapReduce02 {

    public static class YaoMap02 extends TableMapper<Text, Text>{
        @Override

        //key是hbase中的行鍵
        //value是hbase中的所行鍵的所有資料
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                        throws IOException, InterruptedException {
            Text v=null;
            String[] kStrs=null;
            List<Cell> cs=value.listCells();
            for(Cell cell:cs){
                if("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    kStrs=Bytes.toString(CellUtil.cloneValue(cell)).split(",");
                //  System.out.println("yaomap,kStrs=="+kStrs);
                }
                else if("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    v=new Text(CellUtil.cloneValue(cell));
                }

            }
            for (String kStr : kStrs) {
                System.out.println("*****"+new Text(kStr.toLowerCase())+"==="+v);
                context.write(new Text(kStr.toLowerCase()), v);
            }
        }

    }

    public static class YaoReduce02 extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringBuilder sb=new StringBuilder();
            for(Text text:values){
                System.out.println("sb=="+sb);
                System.out.println(sb.length() > 0);
                sb.append((sb.length() > 0 ? ",":"") + text.toString());

            }
            Text va=new Text(sb.toString());

            context.write(key, va);

        }
    }




    public static void main(String[] args) throws Exception {
        // 測試資料執行一次後需要註釋
        // testData();
        Configuration conf=new Configuration();
        conf=HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.61.128");

        Job job=Job.getInstance(conf,"mapandreduce02");
        job.setJarByClass(YaoHbaseAndMapReduce02.class);//將此類打成jar包

        Scan scan=new Scan();
        //取對業務有用的資料 tags, nickname
        scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
        scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

        TableMapReduceUtil.initTableMapperJob("blog1", scan, YaoMap02.class, Text.class, Text.class, job);



        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/hbaseout" + new Date().getTime()));
        job.setReducerClass(YaoReduce02.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);


    }



    public static void testData() {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.61.128");
            Connection con = ConnectionFactory.createConnection(conf);
            Admin admin = con.getAdmin();

            TableName tn = TableName.valueOf("blog1");
            if (admin.tableExists(tn)) {
                admin.disableTable(tn);
                admin.deleteTable(tn);
            }

            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd01 = new HColumnDescriptor("article");
            htd.addFamily(hcd01);
            HColumnDescriptor hcd02 = new HColumnDescriptor("author");
            htd.addFamily(hcd02);
            admin.createTable(htd);

            Table t = con.getTable(tn);
            Put put = new Put(Bytes.toBytes("1"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"),
                    Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, "
                            + "realtime read/write access to your Big Data"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan"));

            Put put02 = new Put(Bytes.toBytes("10"));
            put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
            put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi"));

            Put put03 = new Put(Bytes.toBytes("100"));
            put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
            put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman"));

            List<Put> puts = Arrays.asList(put, put02, put03);
            t.put(puts);
            System.out.println("==========> 測試資料準備完成...");

            if (admin != null) {
                admin.close();
            }
            if (con != null) {
                con.close();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }



}

相關推薦

Hbase資料解析mapreduce過程遇到的問題

本次介紹的是將hbase中存的資料,用mapreduce解析。 一,hbase中的資料結構: 二,分析map過程: 因為這裡是對hbase中的表進行解析,所以map必須是繼承TableMapper類來對資料進行解析。 注意:輸入的ke

資料MapReduce瞭解MapReduce Job提交到Yarn的工作流程

MapReduce主要用途是進行分散式計算 一、MapReduce理解 巨集觀上的理解: MapReduce僅僅是作為客戶端(Client)把程式碼程式提交到Yarn平臺上,MapReduce jar在Yarn上執行,屬於客戶端提交的過程,hdfs上傳的命令。

JAVA資料庫連線,JDBC深度分析解析連結過程方式

一、概念 Java DataBase Connectivity,是一種用於執行SQL語句的Java API,它由一組用Java語言編寫的類和介面組成。通過這些類和介面, JDBC把SQL語句傳送給不同型別的資料庫進行處理並接收處理結果。 二、JDBC兩大類 1.

動圖+原始碼,演示Java中常用資料結構執行過程原理

最近在整理資料結構方面的知識, 系統化看了下Java中常用資料結構, 突發奇想用動畫來繪製資料流轉過程. 主要基於jdk8, 可能會有些特性與jdk7之前不相同, 例如LinkedList LinkedHashMap中的雙向列表不再是迴環的. HashMap中的單鏈表是尾插, 而不是頭插入等等, 後文不再贅敘

資料技術學習筆記之Hadoop框架基礎3-網站日誌分析MapReduce過程詳解

一、回顧     -》Hadoop啟動方式         -》單個程序             sbin/h

BCP SQL匯出EXCEL常見問題解決方法;資料匯出儲存過程

一、‘xp_cmdshell’的啟用 SQL Server阻止了對元件‘xp_cmdshell’的過程‘sys.xp_cmdshell’的訪問。因為此元件已作為此服務囂安全配置的一部分而被關 閉。系統管理員可以通過使用sp_configure啟用‘xp_cmdshell’。有關啟用‘xp_cmdshell’

HBase shell 命令建立表新增資料操作

建立表,表名hbase_1102,HBase表是由Key-Value組成的,此表中Key為NAME    此表有兩個列族,CF1和CF2,其中CF1和CF2下分別有兩個列name和gender,Chinese和Math  1,建立表hbase_1102有兩個列族C

python爬蟲三大解析資料方法:bs4 爬小說網案例

bs4 python獨有可以將html文件轉成bs物件,可以直接呼叫bs物件的屬性進行解析 安裝 pip install bs4 本地html Beautiful(“open(‘路徑’)”,‘lxml’) 網路html Beautiful

HBase資料讀取流程解析

  和寫流程相比,HBase讀資料是一個更加複雜的操作流程,這主要基於兩個方面的原因:其一是因為整個HBase儲存引擎基於LSM-Like樹實現,因此一次範圍查詢可能會涉及多個分片、多塊快取甚至多個數據儲存檔案;其二是因為HBase中更新操作以及刪除操作實現都很簡單,更新操作並沒有更新

MapReduce 中如何處理HBase中的資料?如何讀取HBase資料給Map?如何將結果儲存到HBase中?

   MapReduce 中如何處理HBase中的資料?如何讀取HBase資料給Map?如何將結果儲存到HBase中? Mapper類:包括一個內部類(Context)和四個方法(setup,map,cleanup,run);     &n

Hadoop技術內幕:深入解析MapReduce架構設計與實現原理 (大資料技術叢書).epub

  【下載地址】 《Hadoop技術內幕:深入解析MapReduce架構設計與實現原理》內容簡介:“Hadoop技術內幕”共兩冊,分別從原始碼的角度對“Common+HDFS”和“MapReduce的架構設計和實現原理”進行了極為詳細的分析。《Hadoop技術內幕:深入解析M

學習筆記:從0開始學習大資料-9. MapReduce讀並寫Hbase資料

上節的MapReduce計算WordCount例子是從hdfs讀輸入檔案,計算結果也寫入hdfs MapReduce分散式計算的輸入輸出可以根據需要從hdfs或hbase讀取或寫入,如 A.讀hdfs-->寫hdfs B.讀hdfs-->寫hbase C.讀hbase--

資料技術學習筆記之Hadoop框架基礎2-MapReduce程式設計執行流程

一、回顧     -》hadoop的功能?         -》海量資料儲存和海量計算問題         -》分散式檔案儲存框架hdfs和

使用MapReduce讀取HBase資料儲存到MySQL

Mapper讀取HBase資料 package MapReduce; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client

h265 Nalu型別判斷 sps 資料解析

  一,Nalu解析                         首先來介紹下h265(HEVC)nal單元頭,與h264的nal層相比,h265的nal unit header有兩個位元組構成,如下圖所示  0                   1

HBase的Compact引數設定資料讀寫流程剖析-OLAP商業環境實戰

1 HBase Compact 功能剖析 Compaction會從一個region的一個store中選擇一些hfile檔案進行合併。合併說來原理很簡單,先從這些待合併的資料檔案中讀出KeyValues,

python爬蟲三大解析資料方法:xpath 爬段子網案例

下載 pip install lxml 瀏覽器外掛 外掛xpath安裝,XPath Helper 瀏覽器快捷鍵control+shift+x 用於測試自己的xpath是否寫對了 注意 etree建立物件時 etree.parse('本地路徑') 或 et

DNS解析過程相關配置檔案

參考文獻:http://linux.vbird.org/linux_server/0350dns.php 文章目錄 1. DNS解析過程 1.1 DNS解析過程中兩種模式 2.DNS伺服器型別 3.DNS伺服器安裝配置

Client訪問HBase資料過程

2013-01-09 周海漢 2013.1.9 http://abloz.com client訪問HBase資料前,先要到Zookeeper查詢hmaster的地址和-ROOT-表的region儲存在哪裡。 [zk: h

hbase資料匯入hdfs中之(使用MapReduce程式設計統計hbase庫中的mingxing表中男女數量)

資料 zhangfenglun,M,20,13522334455,[email protected],23521472 chenfei,M,20,13684634455,[email protected],84545472 liyuchen,M,20,1352233425