1. 程式人生 > >spark操作讀取hbase例項

spark操作讀取hbase例項

博主專案實踐中,經常需要用spark從hbase中讀取資料。其中,spark的版本為1.6,hbase的版本為0.98。現在記錄一下如何在spark中操作讀取hbase中的資料。
對於這種操作型的需求,沒有什麼比直接上程式碼更簡單明瞭的了。so,show me the code!

object Demo extends Logging{

  val CF_FOR_FAMILY_USER = Bytes.toBytes("U");
  val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")
  val QF_FOR_MODEL = Bytes.toBytes("model"
) val HBASE_CLUSTER = "hbase://xxx/" val TABLE_NAME = "xxx"; val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME def genData(sc:SparkContext) = { //20161229的資料,rowkey的設計為9999-yyyyMMdd val filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770")) //得到qf為w:00-23的資料
val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w")) val all_filters = new util.ArrayList[Filter]() all_filters.add(filter_of_1229) all_filters.add(filter_of_qf) //hbase多個過濾器 val filterList = new FilterList(all_filters) val scan = new
Scan().addFamily(CF_FOR_FAMILY_USER) scan.setFilter(filterList) scan.setCaching(1000) scan.setCacheBlocks(false) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE ) conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray())) sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) //後面是針對hbase查詢結果的具體業務邏輯 .map() ... def main(args: Array[String]): Unit = { val Array(output_path) = args val sparkConf = new SparkConf().setAppName("demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) genUuidWifi(sc).saveAsTextFile(output_path) sc.stop() } }

需要注意的一個小點就是如果hbase裡有多個過濾器,注意需要使用FilterList。