1. 程式人生 > >pyspark連線hbase學習

pyspark連線hbase學習

1、讀取資料

from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf

spark=SparkSession.builder.appName("abv").getOrCreate() #建立spark物件
print('spark物件已建立')
host = 'learn'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
        print (k, v)

2、寫入資料

from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf

spark=SparkSession.builder.appName("abv").getOrCreate() #建立spark物件
print('spark物件已建立')
host = 'learn'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
 
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
#( rowkey , [ row key , column family , column name , value ] )
print('準備寫入資料')
spark.sparkContext.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

遇到的問題:

找不類,需要把hbase的lib中hbase開頭的jar複製到spark的jars包中,可以建立一個新的資料夾,再在spark-env.sh中新增SPARK_CLASSPATH=放入hbase的包的檔案路徑。並且下載spark-example-1.6.0.jar放到之前建立的資料夾中。

重啟spark