Hortonworks的開源框架SHC的使用(二)
阿新 • • 發佈:2018-12-10
shc測試環境的搭建參考:
spark讀寫HBase之使用hortonworks的開源框架shc(一):原始碼編譯以及測試工程建立讀寫HBase需要兩個核心的元素:
- 使用者描述資料結構的schema字串
- 與schema字串相對應的實體類
1. 定義schema字串
object Catalog { val schema = s"""{ | "table":{"namespace":"default", "name":"test1", "tableCoder":"PrimitiveType"}, | "rowkey":"key", | "columns":{ | "col0":{"cf":"rowkey", "col":"key", "type":"string"}, | "col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, | "col2":{"cf":"cf2", "col":"col2", "type":"double"}, | "col3":{"cf":"cf3", "col":"col3", "type":"float"}, | "col4":{"cf":"cf4", "col":"col4", "type":"int"}, | "col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, | "col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, | "col7":{"cf":"cf7", "col":"col7", "type":"tinyint"}, | "col8":{"cf":"cf8", "col":"col8", "type":"byte"} | } |}""".stripMargin }
schema字串說明:
2. 定義與schema字串對應的實體類
case class HBaseRecord( col0: String, // sql: string col1: Boolean, // sql: boolean col2: Double, // sql: double col3: Float, // sql: float col4: Int, // sql: int col5: Long, // sql: bigint col6: Short, // sql: smallint col7: Byte, // sql: tinyint col8: Array[Byte]) // sql: byte object HBaseRecord { def apply(i: Int): HBaseRecord = { HBaseRecord(i + "", i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, i.toByte, (i + "").getBytes("UTF-8")) } }
3. 寫資料到HBase表完整程式碼
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog object WriteHBase { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("WriteHBase").getOrCreate() val sc = spark.sparkContext val sqlContext = spark.sqlContext import sqlContext.implicits._ // 模擬一批資料 val data = (0 to 9).map(HBaseRecord(_)) // 寫資料 sc.parallelize(data) .toDF .write .options(Map(HBaseTableCatalog.tableCatalog -> Catalog.schema, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save() spark.stop() } }
說明:
Map(HBaseTableCatalog.tableCatalog -> Catalog.schema, HBaseTableCatalog.newTable -> "5")
這個程式碼意味著HBase表是不存在的,也就是我們在schema字串中定義的"test1"這個表不存在,程式幫我們自動建立,5是region的個數,如果你提前建立好了表,那麼這裡的程式碼是這樣的:
Map(HBaseTableCatalog.tableCatalog -> Catalog.schema)
當表存在的時候,資料會追加進去。
執行完以上程式後,檢查一下hbase表中的內容:
hbase(main):003:0> scan 'test1'
ROW COLUMN+CELL
0 column=cf1:col1, timestamp=1534732543615, value=\xFF
0 column=cf2:col2, timestamp=1534732543615, value=\x00\x00\x00\x00\x00\x00\x00\x00
0 column=cf3:col3, timestamp=1534732543615, value=\x00\x00\x00\x00
0 column=cf4:col4, timestamp=1534732543615, value=\x00\x00\x00\x00
0 column=cf5:col5, timestamp=1534732543615, value=\x00\x00\x00\x00\x00\x00\x00\x00
0 column=cf6:col6, timestamp=1534732543615, value=\x00\x00
0 column=cf7:col7, timestamp=1534732543615, value=\x00
0 column=cf8:col8, timestamp=1534732543615, value=0
1 column=cf1:col1, timestamp=1534732543615, value=\x00
1 column=cf2:col2, timestamp=1534732543615, value=?\xF0\x00\x00\x00\x00\x00\x00
......
說明:程式本地執行會報以下錯誤
java.lang.IllegalArgumentException: Pathname /C:/Users/bonc/AppData/Local/Temp/spark-9fa1e56c-ce87-43e8-a936-f947b62e1af5/outputDataset/.spark-staging-5
from C:/Users/bonc/AppData/Local/Temp/spark-9fa1e56c-ce87-43e8-a936-f947b62e1af5/outputDataset/.spark-staging-5 is not a valid DFS filename.
這是因為本地執行把臨時資料夾建立在本地,而刪除臨時資料夾時認為這個資料夾是一個HDFS的路徑,所以報錯,這個錯誤不影響讀寫資料,當在叢集上跑這個程式就不會報錯
4. 從HBase表讀資料完整程式碼
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
object ReadHBase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("ReadHBase").master("local").getOrCreate()
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val df: DataFrame = sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> Catalog.schema))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.createOrReplaceTempView("test1")
spark.sql("select * from test1").show()
spark.stop()
}
}
執行結果如下:
+----+-----+----+----+----+----+----+----+----+
|col0| col1|col2|col3|col4|col5|col6|col7|col8|
+----+-----+----+----+----+----+----+----+----+
| 0| true| 0.0| 0.0| 0| 0| 0| 0| 48|
| 1|false| 1.0| 1.0| 1| 1| 1| 1| 49|
| 2| true| 2.0| 2.0| 2| 2| 2| 2| 50|
| 3|false| 3.0| 3.0| 3| 3| 3| 3| 51|
| 4| true| 4.0| 4.0| 4| 4| 4| 4| 52|
| 5|false| 5.0| 5.0| 5| 5| 5| 5| 53|
| 6| true| 6.0| 6.0| 6| 6| 6| 6| 54|
| 7|false| 7.0| 7.0| 7| 7| 7| 7| 55|
| 8| true| 8.0| 8.0| 8| 8| 8| 8| 56|
| 9|false| 9.0| 9.0| 9| 9| 9| 9| 57|
+----+-----+----+----+----+----+----+----+----+