saprk sql 整合 hbase 通過phoenix 關係對映 jdbc
阿新 • • 發佈:2018-11-27
首先說明遇到的坑
要在phoenxi中建表 , 只有在Phoenix中建表才鞥對映到hbase ,因為spark是通過Phoenix的jdbc
插入資料的 不是吧資料放進hbase而是把資料放進Phoenix
所以在hbase中建表是找不到的
我是通過sparksql插入的
sparksql是datafranme
所以使用Phoenix比較有優勢
Phoenix建表
CREATE TABLE record (hospitalid CHAR(80) PRIMARY KEY ,hcount BIGINT,havgcost DOUBLE,havgreimburse DOUBLE,havgreproportion DOUBLE,havgday DOUBLE,havgfinproportion DOUBLE)
程式碼
val insertTable: String = "record"
val DB_PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"
val DB_PHOENIX_URL = "jdbc:phoenix:master"
spark.sql(" SELECT \nhospitalid \n,SUM( CASE flag WHEN 1 THEN 1 ELSE 0 END ) hcount \n,SUM(allcost)/COUNT(l.recordid) havgcost\n,SUM(recost)/SUM( CASE flag WHEN 1 THEN 1 ELSE 0 END ) havgreimburse \n,SUM(recost/allcost)/COUNT(l.recordid) havgreproportion \n, SUM(datediff(starttime,endtime))/COUNT(l.recordid) havgday\n, SUM(CASE isrecovery WHEN 1 THEN 1 ELSE 0 END)/COUNT(l.recordid) havgfinproportion\n FROM \nrecord l , reimburse r\nWHERE\nl.recordid=r.recordid\nAND\nflag = 1\n\nGROUP BY\n hospitalid ")
.write.format("org.apache.phoenix.spark")//要以什麼格式插入
.mode(SaveMode.Overwrite)//插入模式
.option("driver", DB_PHOENIX_DRIVER)//Phoenix驅動
.option("table", insertTable)//表名
.option("zkUrl", DB_PHOENIX_URL).save()//zookeeper的地址 url
整了半天才研究出來
還有一種方法是通過makerdd這種方法較為困難
需要把datafram轉換為makerdd
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("INSERTHBase").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
//設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定
conf.set("hbase.zookeeper.quorum","master")
//設定zookeeper連線埠,預設2181
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tablename = "test1"
//初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
/*一個Put物件就是一行記錄,在構造方法中指定主鍵
* 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換
* Put.add方法接收三個引數:列族,列名,資料
*/
val put = new Put(Bytes.toBytes(arr(0).toInt))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
//轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset
(new ImmutableBytesWritable, put)
}}
rdd.saveAsHadoopDataset(jobConf)
sc.stop()
}
踩坑不少
附上詩詞 簡直感人
妾發初覆額,折花門前劇。
郎騎竹馬來,繞床弄青梅。
同居長幹裡,兩小無嫌猜,
十四為君婦,羞顏未嘗開。
低頭向暗壁,千喚不一回。
十五始展眉,願同塵與灰。
常存抱柱信,豈上望夫臺。
十六君遠行,瞿塘灩澦堆。
五月不可觸,猿聲天上哀。
門前遲行跡,一一生綠苔。
苔深不能掃,落葉秋風早。
八月蝴蝶來,雙飛西園草。
感此傷妾心,坐愁紅顏老。
早晚下三巴,預將書報家。
相迎不道遠,直至長風沙。