1. 程式人生 > >通過PySpark訪問Hbase並轉成DataFrame

通過PySpark訪問Hbase並轉成DataFrame

介紹PySpark訪問Hbase的兩種方法,一種是通過newAPIHadoopRDD,讀取Hbase為RDD,並轉成DataFrame,另一種是在Hive裡建立Hbase的外部表,然後通過Spark Sql讀取

一、通過newAPIHadoopRDD讀取

#spark連線hbase,讀取RDD資料

spark = SparkSession.builder.master("yarn-client").appName("hbase_test").getOrCreate()

hbaseconf = {"hbase.zookeeper.quorum":'10.18.105.15',"hbase.mapreduce.inputtable"

:"table_name",

                      "hbase.mapreduce.scan.row.start":"***", "hbase.mapreduce.scan.row.stop":"***"}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = spark.sparkContext.newAPIHadoopRDD(\

"org.apache.hadoop.hbase.mapreduce.TableInputFormat",\

"org.apache.hadoop.hbase.io.ImmutableBytesWritable",\

"org.apache.hadoop.hbase.client.Result",\

keyConverter=keyConv, valueConverter=valueConv, conf=hbaseconf)

#從每列的dict中提取列名和取值,組成dict

def call_transfor(y1):

y2 = [json.loads(i) for i in y1]

fdc={}

for i in y2:

colname = i['qualifier']

value = i['value']

fdc[colname] = value

return fdc

#將hbase RDD轉換為DataFrame

def rdd_to_df(hbase_rdd):

#同一個RowKey對應的列之間是用\n分割,進行split,split後每列是個dict

fdc_split = hbase_rdd.map(lambda x:(x[0],x[1].split('\n')))

#提取列名和取值

fdc_cols = fdc_split.map(lambda x:(x[0],call_transfor(x[1])))

colnames = ['row_key'] + fdc_cols.map(lambda x:[i for i in x[1]]).take(1)[0]

fdc_dataframe = fdc_cols.map(lambda x:[x[0]]+[x[1][i] for i in x[1]]).toDF(colnames)

return fdc_dataframe

#資料轉換

fdc_data = rdd_to_df(hbase_rdd)

二、通過Spark訪問Hbase的Hive外部表

前提是你已經可以通過spark-sql讀取hive的資料了,然後進行如下配置。

1.拷貝如下jar包到${spark_home}/jars(spark2.0之前是${spark_home}/lib):

  • hbase-protocol-1.2.0-cdh5.10.2.jar
  • hbase-client-1.2.0-cdh5.10.2.jar
  • hbase-common-1.2.0-cdh5.10.2.jar
  • hbase-server-1.2.0-cdh5.10.2.jar
  • hive-hbase-handler-1.1.0-cdh5.10.2.jar
  • metrics-core-2.2.0.jar

2.將hbase的配置檔案 hbase-site.xml 拷貝到${spark_home}/conf目錄下。

#建立Hbase的Hive外部表

spark.sql('''

create external table hbase_hive_external_table( key string, col map<string,string>) 
STORED BY "org.apache.hadoop.hive.hbase.HBaseStorageHandler" 
WITH SERDEPROPERTIES ("hbase.columns.mapping" = "t:")
TBLPROPERTIES("hbase.table.name" = "hive_hbase_test"

''')

#spark-sql直接讀取

spark.sql("select * from hbase_hive_external_table")

#先落地成Parquet檔案再讀取,速度會快一些

spark.sql("create table temp.hbase_hive_parquet  stored as parquet as select * from hbase_hive_external_table")

spark.read.parquet("/user/hive/warehouse/temp.db/hbase_hive_parquet")