1. 程式人生 > >spark讀寫hbase效能對比

spark讀寫hbase效能對比

一、spark寫入hbase

    hbase client以put方式封裝資料,並支援逐條或批量插入。spark中內建saveAsHadoopDataset和saveAsNewAPIHadoopDataset兩種方式寫入hbase。為此,將同樣的資料插入其中對比效能。

依賴如下:

 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
     
    <dependency>
     
    <groupId>org.apache.spark</groupId>
     
    <artifactId>spark-core_2.11</artifactId>
     
    <version>2.3.1</version>
     
    </dependency>
     
    <!-- https://
mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.4.6</version> </dependency> <!-- https://
mvnrepository.com/artifact/org.apache.hbase/hbase-common --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.4.6</version> </dependency> <!-- https://
mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.4.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-protocol --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>1.4.6</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-cli/commons-cli --> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency>

 

1. put逐條插入
1.1 hbase客戶端建表

create 'keyword1',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


1.2 code

       

val start_time1 = new Date().getTime
     
        keyword.foreachPartition(records =>{
     
          HBaseUtils1x.init()
     
          records.foreach(f => {
     
            val keyword = f.getString(0)
     
            val app_id = f.getString(1)
     
            val catalog_name = f.getString(2)
     
            val keyword_catalog_pv = f.getString(3)
     
            val keyword_catalog_pv_rate = f.getString(4)
     
            val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
            val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
            HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
          })
     
          HBaseUtils1x.closeConnection()
     
        })
     
        var end_time1 =new Date().getTime
     
        println("HBase逐條插入執行時間為:" + (end_time1 - start_time1))



2.put批量插入
2.1 建表

create 'keyword2',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


2.2 程式碼

      

 val start_time2 = new Date().getTime
     
        keyword.foreachPartition(records =>{
     
          HBaseUtils1x.init()
     
          val puts = ArrayBuffer[Put]()
     
          records.foreach(f => {
     
            val keyword = f.getString(0)
     
            val app_id = f.getString(1)
     
            val catalog_name = f.getString(2)
     
            val keyword_catalog_pv = f.getString(3)
     
            val keyword_catalog_pv_rate = f.getString(4)
     
            val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
            val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
            try{
     
              puts.append(HBaseUtils1x.getPutAction(rowKey,
     
                cf, columns, cols))
     
            }catch{
     
              case e:Throwable => println(f)
     
            }
     
          })
     
          import collection.JavaConverters._
     
          HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
     
          HBaseUtils1x.closeConnection()
     
        })
     
        val end_time2 = new Date().getTime
     
        println("HBase批量插入執行時間為:" + (end_time2 - start_time2))


3. saveAsHadoopDataset寫入

    使用舊的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,為該儲存系統使用Hadoop JobConf物件。JobConf設定一個OutputFormat和任何需要輸出的路徑,就像為Hadoop MapReduce作業配置那樣。
3.1 建表

create 'keyword3',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


3.2 程式碼

val start_time3 = new Date().getTime
     
        keyword.rdd.map(f =>{
     
          val keyword = f.getString(0)
     
          val app_id = f.getString(1)
     
          val catalog_name = f.getString(2)
     
          val keyword_catalog_pv = f.getString(3)
     
          val keyword_catalog_pv_rate = f.getString(4)
     
          val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
          val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
          (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
        }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
     
        val end_time3 = new Date().getTime
     
        println("saveAsHadoopDataset方式寫入執行時間為:" + (end_time3 - start_time3))


4. saveAsNewAPIHadoopDataset寫入

    使用新的Hadoop API將RDD輸出到任何Hadoop支援儲存系統,為該儲存系統使用Hadoop Configuration物件.Conf設定一個OutputFormat和任何需要的輸出路徑,就像為Hadoop MapReduce作業配置那樣。
4.1 建表

create 'keyword4',{NAME=>'info',BLOCKSIZE=>'16384',BLOCKCACHE=>'false'},{NUMREGIONS=>10,SPLITALGO=>'HexStringSplit'}


4.2 code

   

val start_time4 = new Date().getTime
     
        keyword.rdd.map(f =>{
     
          val keyword = f.getString(0)
     
          val app_id = f.getString(1)
     
          val catalog_name = f.getString(2)
     
          val keyword_catalog_pv = f.getString(3)
     
          val keyword_catalog_pv_rate = f.getString(4)
     
          val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
          val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
          (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
        }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
     
        val end_time4 = new Date().getTime
     
        println("saveAsNewAPIHadoopDataset方式寫入執行時間為:" + (end_time4 - start_time4))



5. 效能對比
     

可以看出,saveAsHadoopDataset和saveAsNewAPIHadoopDataset方式要優於put逐條插入和批量插入。


二、spark讀取hbase

newAPIHadoopRDD API可以將hbase錶轉化為RDD,具體使用如下:

val start_time1 = new Date().getTime
 
    val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
 
    println(hbaseRdd.count())
 
    hbaseRdd.foreach{
 
      case(_,result) => {
 
        // 獲取行鍵
 
        val rowKey = Bytes.toString(result.getRow)
 
        val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
 
        val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
 
        println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
 
      }
 
    }

 

三、完整程式碼

   

package com.sparkStudy.utils
     
     
     
    import java.util.Date
     
    import org.apache.hadoop.hbase.client.{Put, Result}
     
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
     
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
     
    import org.apache.hadoop.hbase.util.{Bytes, MD5Hash}
     
    import org.apache.spark.sql.SparkSession
     
    import scala.collection.mutable.ArrayBuffer
     
     
     
    /**
      * @Author: JZ.lee
      * @Description: TODO
      * @Date: 18-8-28 下午4:28
      * @Modified By:
      */
     
    object SparkRWHBase {
     
      def main(args: Array[String]): Unit = {
     
        val spark = SparkSession.builder()
     
          .appName("SparkRWHBase")
     
          .master("local[2]")
     
          .config("spark.some.config.option", "some-value")
     
          .getOrCreate()
     
     
     
        val keyword = spark.read
     
          .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
     
          .option("header",false)
     
          .option("delimiter",",")
     
          .load("file:/opt/data/keyword_catalog_day.csv")
     
     
     
        val tableName1 = "keyword1"
     
        val tableName2 = "keyword2"
     
        val tableName3 = "keyword3"
     
        val tableName4 = "keyword4"
     
        val cf = "info"
     
        val columns = Array("keyword", "app_id", "catalog_name", "keyword_catalog_pv", "keyword_catalog_pv_rate")
     
     
     
        val start_time1 = new Date().getTime
     
        keyword.foreachPartition(records =>{
     
          HBaseUtils1x.init()
     
          records.foreach(f => {
     
            val keyword = f.getString(0)
     
            val app_id = f.getString(1)
     
            val catalog_name = f.getString(2)
     
            val keyword_catalog_pv = f.getString(3)
     
            val keyword_catalog_pv_rate = f.getString(4)
     
            val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
            val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
            HBaseUtils1x.insertData(tableName1, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
          })
     
          HBaseUtils1x.closeConnection()
     
        })
     
        var end_time1 =new Date().getTime
     
        println("HBase逐條插入執行時間為:" + (end_time1 - start_time1))
     
     
     
        val start_time2 = new Date().getTime
     
        keyword.foreachPartition(records =>{
     
          HBaseUtils1x.init()
     
          val puts = ArrayBuffer[Put]()
     
          records.foreach(f => {
     
            val keyword = f.getString(0)
     
            val app_id = f.getString(1)
     
            val catalog_name = f.getString(2)
     
            val keyword_catalog_pv = f.getString(3)
     
            val keyword_catalog_pv_rate = f.getString(4)
     
            val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
            val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
            try{
     
              puts.append(HBaseUtils1x.getPutAction(rowKey,
     
                cf, columns, cols))
     
            }catch{
     
              case e:Throwable => println(f)
     
            }
     
          })
     
          import collection.JavaConverters._
     
          HBaseUtils1x.addDataBatchEx(tableName2, puts.asJava)
     
          HBaseUtils1x.closeConnection()
     
        })
     
        val end_time2 = new Date().getTime
     
        println("HBase批量插入執行時間為:" + (end_time2 - start_time2))
     
     
     
        val start_time3 = new Date().getTime
     
        keyword.rdd.map(f =>{
     
          val keyword = f.getString(0)
     
          val app_id = f.getString(1)
     
          val catalog_name = f.getString(2)
     
          val keyword_catalog_pv = f.getString(3)
     
          val keyword_catalog_pv_rate = f.getString(4)
     
          val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
          val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
          (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
        }).saveAsHadoopDataset(HBaseUtils1x.getJobConf(tableName3))
     
        val end_time3 = new Date().getTime
     
        println("saveAsHadoopDataset方式寫入執行時間為:" + (end_time3 - start_time3))
     
        //
     
        val start_time4 = new Date().getTime
     
        keyword.rdd.map(f =>{
     
          val keyword = f.getString(0)
     
          val app_id = f.getString(1)
     
          val catalog_name = f.getString(2)
     
          val keyword_catalog_pv = f.getString(3)
     
          val keyword_catalog_pv_rate = f.getString(4)
     
          val rowKey = MD5Hash.getMD5AsHex(Bytes.toBytes(keyword+app_id)).substring(0,8)
     
          val cols = Array(keyword,app_id,catalog_name,keyword_catalog_pv,keyword_catalog_pv_rate)
     
          (new ImmutableBytesWritable, HBaseUtils1x.getPutAction(rowKey, cf, columns, cols))
     
        }).saveAsNewAPIHadoopDataset(HBaseUtils1x.getNewJobConf(tableName4,spark.sparkContext))
     
        val end_time4 = new Date().getTime
     
        println("saveAsNewAPIHadoopDataset方式寫入執行時間為:" + (end_time4 - start_time4))
     
     
     
        val hbaseRdd = spark.sparkContext.newAPIHadoopRDD(HBaseUtils1x.getNewConf(tableName1), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
     
        println(hbaseRdd.count())
     
        hbaseRdd.foreach{
     
          case(_,result) => {
     
            // 獲取行鍵
     
            val rowKey = Bytes.toString(result.getRow)
     
            val keyword = Bytes.toString(result.getValue(cf.getBytes(), "keyword".getBytes()))
     
            val keyword_catalog_pv_rate = Bytes.toDouble(result.getValue(cf.getBytes(), "keyword_catalog_pv_rate".getBytes()))
     
            println(rowKey + "," + keyword + "," + keyword_catalog_pv_rate)
     
          }
     
        }
     
      }
     
    }
     
    package com.sparkStudy.utils
     
     
     
    import org.apache.hadoop.conf.Configuration
     
    import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener
     
    import org.apache.hadoop.hbase.client._
     
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
     
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
     
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
     
    import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
     
    import org.apache.hadoop.mapred.JobConf
     
    import org.apache.hadoop.mapreduce.Job
     
    import org.apache.spark.SparkContext
     
    import org.slf4j.LoggerFactory
     
     
     
     
     
    /**
      * @Author: JZ.Lee
      * @Description:HBase1x增刪改查
      * @Date: Created at 上午11:02 18-8-14
      * @Modified By:
      */
     
    object HBaseUtils1x {
     
      private val LOGGER = LoggerFactory.getLogger(this.getClass)
     
      private var connection:Connection = null
     
      private var conf:Configuration = null
     
     
     
      def init() = {
     
        conf = HBaseConfiguration.create()
     
        conf.set("hbase.zookeeper.quorum", "lee")
     
        connection = ConnectionFactory.createConnection(conf)
     
      }
     
     
     
      def getJobConf(tableName:String) = {
     
        val conf = HBaseConfiguration.create()
     
        val jobConf = new JobConf(conf)
     
        jobConf.set("hbase.zookeeper.quorum", "lee")
     
        jobConf.set("hbase.zookeeper.property.clientPort", "2181")
     
        jobConf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE,tableName)
     
        jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
     
        jobConf
     
      }
     
     
     
      def getNewConf(tableName:String) = {
     
        conf = HBaseConfiguration.create()
     
        conf.set("hbase.zookeeper.quorum", "lee")
     
        conf.set("hbase.zookeeper.property.clientPort", "2181")
     
        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,tableName)
     
        val scan = new Scan()
     
        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
     
        conf
     
      }
     
     
     
      def getNewJobConf(tableName:String) = {
     
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", Constants.ZOOKEEPER_SERVER_NODE)
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.defaults.for.version.skip", "true")
        conf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
        conf.setClass("mapreduce.job.outputformat.class", classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]],
          classOf[org.apache.hadoop.mapreduce.OutputFormat[String, Mutation]])
        new JobConf(conf)
      }
     
     
     
      def closeConnection(): Unit = {
     
        connection.close()
     
      }
     
      def getGetAction(rowKey: String):Get = {
     
        val getAction = new Get(Bytes.toBytes(rowKey));
     
        getAction.setCacheBlocks(false);
     
        getAction
     
      }
     
     
     
      def getPutAction(rowKey: String, familyName:String, column: Array[String], value: Array[String]):Put = {
     
        val put: Put = new Put(Bytes.toBytes(rowKey));
     
        for (i <- 0 until(column.length)) {
     
          put.add(Bytes.toBytes(familyName), Bytes.toBytes(column(i)), Bytes.toBytes(value(i)));
     
        }
     
        put
     
      }
     
     
     
      def insertData(tableName:String, put: Put) = {
     
        val name = TableName.valueOf(tableName)
     
        val table = connection.getTable(name)
     
        table.put(put)
     
      }
     
     
     
      def addDataBatchEx(tableName:String, puts:java.util.List[Put]): Unit = {
     
        val name = TableName.valueOf(tableName)
     
        val table = connection.getTable(name)
     
        val listener = new ExceptionListener {
     
          override def onException
     
          (e: RetriesExhaustedWithDetailsException, bufferedMutator: BufferedMutator): Unit = {
     
            for(i <-0 until e.getNumExceptions){
     
              LOGGER.info("寫入put失敗:" + e.getRow(i))
     
            }
     
          }
     
        }
     
        val params = new BufferedMutatorParams(name)
     
          .listener(listener)
     
          .writeBufferSize(4*1024*1024)
     
        try{
     
          val mutator = connection.getBufferedMutator(params)
     
          mutator.mutate(puts)
     
          mutator.close()
     
        }catch {
     
          case e:Throwable => e.printStackTrace()
     
        }
     
      }
     
    }


     
 https://blog.csdn.net/baymax_007/article/details/82191188