spark讀寫hbase
1 配置
1.1 開發環境:
- HBase:hbase-1.0.0-cdh5.4.5.tar.gz
- Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
- ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
- Spark:spark-2.1.0-bin-hadoop2.6
1.2 Spark的配置
- Jar包:需要HBase的Jar如下(經過測試,正常執行,但是是否存在冗餘的Jar並未證實,若發現多餘的jar可自行進行刪除)
jars
- spark-env.sh
新增以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
注:如果使用spark-shell的yarn模式進行測試的話,那麼最好每個NodeManager節點都有配置jars和hbase-site.xml - spark-default.sh
spark.yarn.historyServer.address=slave11:18080 spark.history.ui.port=18080 spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///tmp/spark/events spark.history.fs.logDirectory=hdfs:///tmp/spark/events spark.driver.memory=1g spark.serializer=org.apache.spark.serializer.KryoSerializer
1.3 資料
1)格式:
[email protected][email protected]@[email protected]@1.55
[email protected][email protected]@[email protected]@0.8
[email protected][email protected]@[email protected]@0.8
[email protected][email protected]@[email protected]@0.8
[email protected][email protected]@[email protected]@0.8
[email protected][email protected]@[email protected]@1.65
[email protected][email protected]@[email protected]@1.55
2 程式碼演示
2.1 準備動作
1)既然是與HBase相關,那麼首先需要使用hbase shell來建立一個表
建立表格:create ‘data’,’v’,create ‘data1’,’v’
2)使用spark-shell進行操作,命令如下:
bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2
程式碼演示環境
3)import 各種類
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64,Bytes}
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils
2.2 程式碼實戰
建立conf和table
val conf= HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"data1")
val table = new HTable(conf,"data1")
2.2.1 資料寫入
格式:
val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")
使用for來插入5條資料
for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}
到hbase shell中檢視結果
hbase_data1表中的資料
2.2.2 資料讀取
val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
1)take
hbaseRdd take 1
take_result
2)scan
var scan = new Scan();
scan.addFamily(Bytes.toBytes(“v”));
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN,scanToString)
val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))
scan_result
2.3 批量插入
2.3.1 普通插入
1)程式碼
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create();conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}
2)執行時間如下:7.6 min
執行時間
2.3.2 Bulkload
1) 程式碼:
val conf = HBaseConfiguration.create();
val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)
2) 執行時間:7s
執行時間_BulkLoad
3)執行結果:
到hbase shell 中檢視 list “data1”
結果查詢
通過對比我們可以發現bulkload批量匯入所用時間遠遠少於普通匯入,速度提升了60多倍,當然我沒有使用更大的資料量測試,但是我相信匯入速度的提升是非常顯著的,強烈建議使用BulkLoad批量匯入資料到HBase中。
相關推薦
Spark讀寫Hbase的二種方式對比
作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請註明出處 一.傳統方式 這種方式就是常用的TableInputFormat和TableOutputFormat來讀寫hbase,如下程式碼所示 簡單解釋下,用sc.newA
spark讀寫hbase效能對比
一、spark寫入hbase hbase client以put方式封裝資料,並支援逐條或批量插入。spark中內建saveAsHadoopDataset和saveAsNewAPIHadoopDataset兩種方式寫入hbase。為此,將同樣的資料插入其中對比效能。依賴如下:
spark讀寫hbase
1 配置 1.1 開發環境: HBase:hbase-1.0.0-cdh5.4.5.tar.gzHadoop:hadoop-2.6.0-cdh5.4.5.tar.gzZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gzSpark:spark-2.1.0-bin-hadoop2.
如何使用scala+spark讀寫hbase?
最近工作有點忙,所以文章更新頻率低了點,希望大家可以諒解,好了,言歸正傳,下面進入今天的主題: 如何使用scala+spark讀寫Hbase 軟體版本如下: scala2.11.8 spark2.1.0 hbase1.2.0 公司有一些實時資料處理的專案,儲存
spark常見操作系列(3)--spark讀寫hbase(2)
接著上一篇, 問題(2): scan有 scan.setCaching(10000) scan.setCacheBlocks(true) 等設定.setCaching ,個人感覺不夠用.hbase 預設是在記憶體裡面放一塊資料用來讀取,所以讀取效率比較高,可是,
Spark讀寫Hbase資料
環境 spark: 2.4.3 hbase: 1.1.5 步驟 啟動hadoop-3.1.2,hbase2.2.0 把HBas
spark通過phoenix讀寫hbase(Java版)-轉:https://blog.csdn.net/xiongbingcool/article/details/81458602
pom.xml <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId
大資料-05-Spark之讀寫HBase資料
準備工作一:建立一個HBase表 這裡依然是以student表為例進行演示。這裡假設你已經成功安裝了HBase資料庫,如果你還沒有安裝,可以參考大資料-04-Hbase入門,進行安裝,安裝好以後,不要建立資料庫和表,只要跟著本節後面的內容操作即可。 因為hbase依賴於hadoop,因此啟動和停止都是需要按
spark通過phoenix讀寫hbase(Java版)
pom.xml <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>or
Spark讀寫OSS並使用OSS-Select來加速查詢
Spark讀寫OSS 基於這篇文章搭建的CDH6以及配置,我們來使Spark能夠讀寫OSS(其他版本的Spark都是類似的做法,不再贅述)。 由於預設Spark並沒有將OSS的支援包放到它的CLASSPATH裡面,所以我們需要執行如下命令下面的步驟需要在所有的CDH節點執行 進入到$CDH_HOME/
Apache Spark 讀寫Apache ignite 程式碼實戰
package com.zhw.bigdata.ignite; import org.apache.ignite.spark.IgniteDataFrameSettings; import org.apache.spark.sql.Dataset; import org.apache.spar
Spark讀寫Elasticsearch
Spark讀寫Elasticsearch 版本說明 Spark:2.3.1 Elasticsearch: elasticsearch-6.4.0 1 Scala環境下Spark讀寫Elasticsearch 1.1 依賴包 1.1.1 Spark依賴
java操作spark讀寫mongodb
首先要引入mongodb-spark-connector的maven依賴,具體的可見這個api網址:https://docs.mongodb.com/spark-connector/current/java-api/,然後基本上就可以按照api上面的內容來進行spark操作
spark讀寫csv檔案
如果是spark1.6.0請新增maven: <dependency> <groupId>com.databricks</groupId> <artifactId>s
Hive整合HBase 通過Hive讀/寫HBase中的表
寫在前面一: 本文將Hive與HBase整合在一起,使Hive可以讀取HBase中的資料,讓Hadoop生態系統中最為常用的兩大框架互相結合,相得益彰。 寫在前面二: 使用軟體說明 約定所有軟體的存放目錄: /home/yujianxin 一、Hive整合HBase原理
spark讀寫資料到kafka
叢集環境:CDH5.8.0 / spark1.6.0 / scala2.10.4在使用時,我們需要新增相應的依賴包: <dependency> <groupId>org.apache.spark</groupId>
spark 讀寫 parquet
SQLConf // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The def
【spark 讀寫資料】資料來源的讀寫操作
通用的 Load/Save 函式 在最簡單的方式下,預設的資料來源(parquet 除非另外配置通過spark.sql.sources.default)將會用於所有的操作。 Parquet 是一個列式儲存格式的檔案,被許多其他資料處理系統所支援。Spark
使用C++(通過Thrift)訪問/操作/讀寫Hbase
#include "HbaseOperate.h" #include "log4cxx/log4cxx.h" #include "log4cxx/propertyconfigurator.h" static log4cxx::LoggerPtr logger(log4cxx::Logger::ge
IDEA中 Spark 讀Hbase 報錯處理:
ado htable client ets rim ogg expec zookeep ati SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17/11/19 14:25:57 E