1. 程式人生 > >Spark如何讀取Hbase特定查詢的資料

Spark如何讀取Hbase特定查詢的資料

最近工作需要使用到Spark操作Hbase,上篇文章已經寫了如何使用Spark讀寫Hbase全量表的資料做處理,但這次有所不同,這次的需求是Scan特定的Hbase的資料然後轉換成RDD做後續處理,簡單的使用Google查詢了一下,發現實現方式還是比較簡單的,用的還是Hbase的TableInputFormat相關的API。

基礎軟體版本如下:

Hadoop2.7.2
Hbase1.2.0
Spark2.1.0
Scala2.11.8

直接上程式碼如下:

`           val startRowkey="row1" 
            val endRowkey="row1"
            //開始rowkey和結束一樣代表精確查詢某條資料
//組裝scan語句 val scan=new Scan(Bytes.toBytes(startRowkey),Bytes.toBytes(endRowkey)) scan.setCacheBlocks(false) scan.addFamily(Bytes.toBytes("ks")); scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data")) //將scan類轉化成string型別
val scan_str= TableMapReduceUtil.convertScanToString(scan) conf.set(TableInputFormat.SCAN,scan_str) //使用new hadoop api,讀取資料,並轉成rdd val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) //列印掃描的資料總量
println("count:"+rdd.count)

上面的少量程式碼,已經完整實現了使用spark查詢hbase特定的資料,然後統計出數量最後輸出,當然上面只是一個簡單的例子,重要的是能把hbase資料轉換成RDD,只要轉成RDD我們後面就能進行非常多的過濾操作。

注意上面的hbase版本比較新,如果是比較舊的hbase,如果自定義下面的方法將scan物件給轉成字串,程式碼如下:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

最後,還有一點,上面的程式碼是直接自己new了一個scan物件進行組裝,當然我們還可以不自己new物件,全部使用TableInputFormat下面的相關的常量,並賦值,最後執行的時候TableInputFormat會自動幫我們組裝scan物件這一點通過看TableInputFormat的原始碼就能明白:

private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
    private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
    public static final String SCAN = "hbase.mapreduce.scan";
    public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
    public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
    public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
    public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";

上面程式碼中的常量,都可以conf.set的時候進行賦值,最後任務執行的時候會自動轉換成scan,有興趣的朋友可以自己嘗試。