Spark如何讀取Hbase特定查詢的資料
阿新 • • 發佈:2019-01-31
最近工作需要使用到Spark操作Hbase,上篇文章已經寫了如何使用Spark讀寫Hbase全量表的資料做處理,但這次有所不同,這次的需求是Scan特定的Hbase的資料然後轉換成RDD做後續處理,簡單的使用Google查詢了一下,發現實現方式還是比較簡單的,用的還是Hbase的TableInputFormat相關的API。
基礎軟體版本如下:
Hadoop2.7.2
Hbase1.2.0
Spark2.1.0
Scala2.11.8
直接上程式碼如下:
` val startRowkey="row1"
val endRowkey="row1"
//開始rowkey和結束一樣代表精確查詢某條資料
//組裝scan語句
val scan=new Scan(Bytes.toBytes(startRowkey),Bytes.toBytes(endRowkey))
scan.setCacheBlocks(false)
scan.addFamily(Bytes.toBytes("ks"));
scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data"))
//將scan類轉化成string型別
val scan_str= TableMapReduceUtil.convertScanToString(scan)
conf.set(TableInputFormat.SCAN,scan_str)
//使用new hadoop api,讀取資料,並轉成rdd
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//列印掃描的資料總量
println("count:"+rdd.count)
上面的少量程式碼,已經完整實現了使用spark查詢hbase特定的資料,然後統計出數量最後輸出,當然上面只是一個簡單的例子,重要的是能把hbase資料轉換成RDD,只要轉成RDD我們後面就能進行非常多的過濾操作。
注意上面的hbase版本比較新,如果是比較舊的hbase,如果自定義下面的方法將scan物件給轉成字串,程式碼如下:
import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
def convertScanToString(scan: Scan): String = {
val out: ByteArrayOutputStream = new ByteArrayOutputStream
val dos: DataOutputStream = new DataOutputStream(out)
scan.write(dos)
Base64.encodeBytes(out.toByteArray)
}
最後,還有一點,上面的程式碼是直接自己new了一個scan物件進行組裝,當然我們還可以不自己new物件,全部使用TableInputFormat下面的相關的常量,並賦值,最後執行的時候TableInputFormat會自動幫我們組裝scan物件這一點通過看TableInputFormat的原始碼就能明白:
private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
public static final String SCAN = "hbase.mapreduce.scan";
public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
上面程式碼中的常量,都可以conf.set的時候進行賦值,最後任務執行的時候會自動轉換成scan,有興趣的朋友可以自己嘗試。