1. 程式人生 > >Hortonworks的開源框架SHC的使用(二)

Hortonworks的開源框架SHC的使用(二)

shc測試環境的搭建參考:

spark讀寫HBase之使用hortonworks的開源框架shc(一):原始碼編譯以及測試工程建立

讀寫HBase需要兩個核心的元素:

  • 使用者描述資料結構的schema字串
  • 與schema字串相對應的實體類

1. 定義schema字串

object Catalog {

  val schema = s"""{
                  |   "table":{"namespace":"default", "name":"test1", "tableCoder":"PrimitiveType"},
                  |   "rowkey":"key",
                  |   "columns":{
                  |       "col0":{"cf":"rowkey", "col":"key", "type":"string"},
                  |       "col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
                  |       "col2":{"cf":"cf2", "col":"col2", "type":"double"},
                  |       "col3":{"cf":"cf3", "col":"col3", "type":"float"},
                  |       "col4":{"cf":"cf4", "col":"col4", "type":"int"},
                  |       "col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
                  |       "col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
                  |       "col7":{"cf":"cf7", "col":"col7", "type":"tinyint"},
                  |       "col8":{"cf":"cf8", "col":"col8", "type":"byte"}
                  |   }
                  |}""".stripMargin

}

schema字串說明:

2. 定義與schema字串對應的實體類

case class HBaseRecord(
      col0: String,  // sql: string
      col1: Boolean, // sql: boolean
      col2: Double, // sql: double
      col3: Float, // sql: float
      col4: Int, // sql: int
      col5: Long, // sql: bigint
      col6: Short, // sql: smallint
      col7: Byte, // sql: tinyint
      col8: Array[Byte]) // sql: byte

  object HBaseRecord {
    def apply(i: Int): HBaseRecord = {
      HBaseRecord(i + "",
        i % 2 == 0,
        i.toDouble,
        i.toFloat,
        i,
        i.toLong,
        i.toShort,
        i.toByte,
        (i + "").getBytes("UTF-8"))
    }
  }

3. 寫資料到HBase表完整程式碼

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

object WriteHBase {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("WriteHBase").getOrCreate()
    val sc = spark.sparkContext
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._

    // 模擬一批資料
    val data = (0 to 9).map(HBaseRecord(_))

    // 寫資料
    sc.parallelize(data)
      .toDF
      .write
      .options(Map(HBaseTableCatalog.tableCatalog -> Catalog.schema, HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .save()
    spark.stop()
  }
}

說明:

Map(HBaseTableCatalog.tableCatalog -> Catalog.schema, HBaseTableCatalog.newTable -> "5")

這個程式碼意味著HBase表是不存在的,也就是我們在schema字串中定義的"test1"這個表不存在,程式幫我們自動建立,5是region的個數,如果你提前建立好了表,那麼這裡的程式碼是這樣的:

Map(HBaseTableCatalog.tableCatalog -> Catalog.schema)

當表存在的時候,資料會追加進去。

執行完以上程式後,檢查一下hbase表中的內容:

hbase(main):003:0> scan 'test1'
ROW                                         COLUMN+CELL                                                                                                                 
 0                                          column=cf1:col1, timestamp=1534732543615, value=\xFF                                                                        
 0                                          column=cf2:col2, timestamp=1534732543615, value=\x00\x00\x00\x00\x00\x00\x00\x00                                            
 0                                          column=cf3:col3, timestamp=1534732543615, value=\x00\x00\x00\x00                                                            
 0                                          column=cf4:col4, timestamp=1534732543615, value=\x00\x00\x00\x00                                                            
 0                                          column=cf5:col5, timestamp=1534732543615, value=\x00\x00\x00\x00\x00\x00\x00\x00                                            
 0                                          column=cf6:col6, timestamp=1534732543615, value=\x00\x00                                                                    
 0                                          column=cf7:col7, timestamp=1534732543615, value=\x00                                                                        
 0                                          column=cf8:col8, timestamp=1534732543615, value=0                                                                           
 1                                          column=cf1:col1, timestamp=1534732543615, value=\x00                                                                        
 1                                          column=cf2:col2, timestamp=1534732543615, value=?\xF0\x00\x00\x00\x00\x00\x00
 ......

說明:程式本地執行會報以下錯誤

java.lang.IllegalArgumentException: Pathname /C:/Users/bonc/AppData/Local/Temp/spark-9fa1e56c-ce87-43e8-a936-f947b62e1af5/outputDataset/.spark-staging-5 
from C:/Users/bonc/AppData/Local/Temp/spark-9fa1e56c-ce87-43e8-a936-f947b62e1af5/outputDataset/.spark-staging-5 is not a valid DFS filename.

這是因為本地執行把臨時資料夾建立在本地,而刪除臨時資料夾時認為這個資料夾是一個HDFS的路徑,所以報錯,這個錯誤不影響讀寫資料,當在叢集上跑這個程式就不會報錯

4. 從HBase表讀資料完整程式碼

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

object ReadHBase {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("ReadHBase").master("local").getOrCreate()
    val sc = spark.sparkContext
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._

    val df: DataFrame = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> Catalog.schema))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()

    df.createOrReplaceTempView("test1")

    spark.sql("select * from test1").show()

    spark.stop()
  }
}

執行結果如下:

+----+-----+----+----+----+----+----+----+----+
|col0| col1|col2|col3|col4|col5|col6|col7|col8|
+----+-----+----+----+----+----+----+----+----+
|   0| true| 0.0| 0.0|   0|   0|   0|   0|  48|
|   1|false| 1.0| 1.0|   1|   1|   1|   1|  49|
|   2| true| 2.0| 2.0|   2|   2|   2|   2|  50|
|   3|false| 3.0| 3.0|   3|   3|   3|   3|  51|
|   4| true| 4.0| 4.0|   4|   4|   4|   4|  52|
|   5|false| 5.0| 5.0|   5|   5|   5|   5|  53|
|   6| true| 6.0| 6.0|   6|   6|   6|   6|  54|
|   7|false| 7.0| 7.0|   7|   7|   7|   7|  55|
|   8| true| 8.0| 8.0|   8|   8|   8|   8|  56|
|   9|false| 9.0| 9.0|   9|   9|   9|   9|  57|
+----+-----+----+----+----+----+----+----+----+

轉自:CoderJed