pyspark連線hbase學習
阿新 • • 發佈:2019-01-06
1、讀取資料
from pyspark.sql import SparkSession from pyspark import SparkContext,SparkConf spark=SparkSession.builder.appName("abv").getOrCreate() #建立spark物件 print('spark物件已建立') host = 'learn' table = 'student' conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf) count = hbase_rdd.count() hbase_rdd.cache() output = hbase_rdd.collect() for (k, v) in output: print (k, v)
2、寫入資料
from pyspark.sql import SparkSession from pyspark import SparkContext,SparkConf spark=SparkSession.builder.appName("abv").getOrCreate() #建立spark物件 print('spark物件已建立') host = 'learn' table = 'student' keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua'] #( rowkey , [ row key , column family , column name , value ] ) print('準備寫入資料') spark.sparkContext.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
遇到的問題:
找不類,需要把hbase的lib中hbase開頭的jar複製到spark的jars包中,可以建立一個新的資料夾,再在spark-env.sh中新增SPARK_CLASSPATH=放入hbase的包的檔案路徑。並且下載spark-example-1.6.0.jar放到之前建立的資料夾中。
重啟spark