Spark、BulkLoad Hbase、單列、多列
背景
之前的部落格:Spark:DataFrame寫HFile (Hbase)一個列族、一個列擴充套件一個列族、多個列
用spark 1.6.0 和 hbase 1.2.0 版本實現過spark BulkLoad Hbase的功能,並且擴充套件了其只能操作單列的不便性。
現在要用spark 2.3.2 和 hbase 2.0.2 來實現相應的功能;
本以為會很簡單,兩個框架經過大版本的升級,API變化很大;
官網的案例其實有點難實現,且網上的資料要麼老舊,要麼複製黏貼實在是感人,所以花了點時間重新實現了該功能;
同時記錄了在這個過程中遇到的很多問題。
版本資訊
工具 | 版本 |
---|---|
spark | 2.3.2 |
hbase | 2.0.2 |
配置檔案
hdfs.properties
# zookeeper的資訊
zk=slave01:2181,slave02:2181,slave03:2181,slave04:2181,slave05:2181
zk.host=slave01,slave02,slave03,slave04,slave05
zk.port=2181
maven 依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.version>2.11</scala.version> <spark.version>2.3.2</spark.version> <hbase.version>2.0.2</hbase.version> <hadoop.version>3.1.1</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> <version>${hbase.version}</version> </dependency> </dependencies>
實現程式碼
模版方法
package com.aaa.base import org.apache.spark.storage.StorageLevel import org.slf4j.{Logger, LoggerFactory} /** * @author lillcol * create_time 2019/6/14-14:25 * description :使用模板方法模式建立任務執行流程,保證所有任務的流程統一,所有非流處理任務需要實現此介面 */ trait ExportToHbaseTemplate { val logger: Logger = LoggerFactory.getLogger(getClass.getSimpleName) //任務狀態 val PERSIST_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER /** * 任務模板 * * @param args */ def runWork(args: Array[String]): Unit = { try { // initTepmlate(args) // 模板初始化資訊 init(args) // 初始化資訊 // printfTepmlate //輸出模板初始化結果 printf //輸出初始化結果 workFlow //資料處理流 } catch { case e: Exception => e.printStackTrace } finally { // spark.sparkContext.stop() } } /** * 初始化資訊 * * @param args */ def init(args: Array[String]) /** * 輸出初始化結果 */ def printf() /** * 資料處理流 */ def workFlow() /** * 模板初始化 * * @param args */ def initTepmlate(args: Array[String]): Unit = { } /** * 輸出模板初始化結果 */ def printfTepmlate(): Unit = { } }
讀取配置檔案方法
package com.aaa.util
import java.io.FileInputStream
import java.util.Properties
/**
* 讀取.properties配置檔案
*
* @param path
*/
class ReadProperties(path: String) {
/**
* 讀取、載入指定路徑配置檔案
*
* @return Properties 例項
*/
def getProInstance(): Properties = {
val pro = new Properties()
pro.load(new FileInputStream(path))
pro
}
}
/**
* 伴生物件
*/
object ReadProperties {
def getInstance(path: String): ReadProperties = {
new ReadProperties(path)
}
}
實現主體
package com.aaa.test
import com.aaa.base.{ExportToHbaseTemplate}
import com.aaa.util.ReadProperties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, RegionLocator, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object TestHbase extends ExportToHbaseTemplate {
val proPath = "/root/lillcol/hdfs.properties" //配置檔案路徑
var cf: String = "info" //列族
var defKey: String = "UID" //預設key
val proInstance = ReadProperties.getInstance(proPath).getProInstance
var partition: String = "20190918"
var conf: Configuration = _
var SourceDataFrame: DataFrame = _
var outPutTable: String = "outPutTable"
var savePath: String = s"/tmp/hbase/$outPutTable" //臨時HFile儲存路徑
val spark: SparkSession = SparkSession
.builder()
// .master("local")
.appName("ExportToHBase")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
def main(args: Array[String]): Unit = {
runWork(args)
}
/**
* 初始化資訊
*
* @param args
*/
override def init(args: Array[String]): Unit = {
conf = HBaseConfiguration.create() //Hbase配置資訊
conf.set("hbase.zookeeper.quorum", proInstance.getProperty("zk")) //Hbase zk資訊
conf.set("hbase.mapreduce.hfileoutputformat.table.name", outPutTable) //Hbase 輸出表
conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase 根目錄設定 (有時候會報錯,具體看錯誤處理部分)
conf.set("zookeeper.znode.parent", "/hbase")
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")
}
/**
* 資料處理流
*/
override def workFlow(): Unit = {
getDataset()
val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = getHFileRDD
saveHfile(hfileRDD)
loadHFileToHbase()
}
/**
* 獲取源資料表
*/
def getDataset() = {
SourceDataFrame = spark.read.parquet("/warehouse/data/lillcol/test.parquet")
}
/**
* 將dataset處理成Hbase的資料格式
* 注:
* 預設API只能處理一個列族一個列的情況
* 此處擴充套件了該功能:
* 用var kvlist: Seq[KeyValue] = List()
* 和rdd.flatMapValues(_.iterator) 方式自適應列名
* 處理後的結果為:一個列族多個列
*
* @return
*/
def getHFileRDD(): RDD[(ImmutableBytesWritable, KeyValue)] = {
//key:全域性變數不能在 map 內部使用 所以建立一個區域性變數
//注:如果不做會出現奇怪的異常比如類初始化失敗,spark為初始化等,目前還沒發現具體原因,後續去跟蹤
val key = defKey
//列族
val clounmFamily: String = cf
//獲取列名 第一個為key
val columnsName: Array[String] = SourceDataFrame.columns.sorted
val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = SourceDataFrame
.repartition(200, $"$key") //如果資料量大,可以根據key進行分割槽操作
.rdd
.map(row => {
var kvlist: Seq[KeyValue] = List() //儲存多個列
var kv: KeyValue = null
val cf: Array[Byte] = clounmFamily.getBytes //列族
val rowKey = Bytes.toBytes(row.getAs[Int](key) + "")
val immutableRowKey: ImmutableBytesWritable = new ImmutableBytesWritable(rowKey)
for (i <- 0 to (columnsName.length - 1)) {
//將rdd轉換成HFile需要的格式,
//我們上面定義了Hfile的key是ImmutableBytesWritable,
//那麼我們定義的RDD也是要以ImmutableBytesWritable的例項為key
var value: Array[Byte] = null
try {
//資料是字串的都對映成String
value = Bytes.toBytes(row.getAs[String](columnsName(i)))
} catch {
case e: ClassCastException =>
//出現數據型別轉換異常則說明是數字,都對映成BigInt
value = Bytes.toBytes(row.getAs[BigInt](columnsName(i)) + "")
case e: Exception =>
e.printStackTrace()
}
//封裝KeyValue
kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), value)
//將新的kv加在kvlist後面(不能反 需要整體有序)
kvlist = kvlist :+ kv
}
(immutableRowKey, kvlist)
})
val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = result1
.flatMapValues(_.iterator)
hfileRDD
}
/**
* 儲存生成的HFile檔案
* 注:bulk load 生成的HFile檔案需要落地
* 然後再通過LoadIncrementalHFiles類load進Hbase
* 此處關於 sortBy 操作詳解:
* 0. Hbase查詢是根據rowkey進行查詢的,並且rowkey是有序,
* 某種程度上來說rowkey就是一個索引,這是Hbase查詢高效的一個原因,
* 這就要求我們在插入資料的時候,要插在rowkey該在的位置。
* 1. Put方式插入資料,會有WAL,同時在插入Hbase的時候會根據RowKey的值選擇合適的位置,此方式本身就可以保證RowKey有序
* 2. bulk load 方式沒有WAL,它更像是hive通過load方式直接將底層檔案HFile移動到制定的Hbase路徑下,所以,在不東HFile的情況下,要保證本身有序才行
* 之前寫的時候只要rowkey有序即可,但是2.0.2版本的時候發現clounm也要有序,所以會有sortBy(x => (x._1, x._2.getKeyString), true)
*
* @param hfileRDD
*/
def saveHfile(hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)]) = {
//刪除可能存在的檔案,否則回報檔案已存在異常
delete_hdfspath(savePath)
//生成的HFile儲存到指定目錄
hfileRDD
.sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序
.saveAsNewAPIHadoopFile(savePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
}
/**
* HFile 匯入HBase
*/
def loadHFileToHbase() = {
//開始即那個HFile匯入到Hbase,此處都是hbase的api操作
val load: LoadIncrementalHFiles = new LoadIncrementalHFiles(conf)
//建立hbase的連結,利用預設的配置檔案,實際上讀取的hbase的master地址
val conn: Connection = ConnectionFactory.createConnection(conf)
//根據表名獲取表
val table: Table = conn.getTable(TableName.valueOf(outPutTable))
//獲取hbase表的region分佈
val regionLocator: RegionLocator = conn.getRegionLocator(TableName.valueOf(outPutTable))
//建立一個hadoop的mapreduce的job
val job: Job = Job.getInstance(conf)
//設定job名稱
job.setJobName(s"$outPutTable LoadIncrementalHFiles")
//此處最重要,需要設定檔案輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
//輸出檔案的內容KeyValue
job.setMapOutputValueClass(classOf[KeyValue])
//配置HFileOutputFormat2的資訊
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
//開始匯入
load.doBulkLoad(new Path(savePath), conn.getAdmin, table, regionLocator)
spark.stop()
}
/**
* 輸出初始化結果
*/
override def printf(): Unit = {
}
/**
* 刪除hdfs下的檔案
*
* @param url 需要刪除的路徑
*/
def delete_hdfspath(url: String) {
val hdfs: FileSystem = FileSystem.get(new Configuration)
val path: Path = new Path(url)
if (hdfs.exists(path)) {
val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
hdfs.delete(path, true)
}
}
}
打包及執行命令
執行命令:
spark-submit \
--master yarn-client \
--driver-memory 2G \
--executor-memory 4G \
--executor-cores 4 \
--num-executors 4 \
--conf spark.yarn.executor.memoryOverhead=8192 \
--class com.aaa.test.TestHbase \
/home/apps/lillcol/TestHbase.jar \
注:已有Hbase表“outPutTable”,想要檢視hbase資料除了hbase shell 還可以關聯hive表,
參考:Spark:DataFrame批量匯入Hbase的兩種方式(HFile、Hive)
---
---
異常和錯誤
非法迴圈引用
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference
Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1502)
at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1500)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:546)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1500)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:744)
at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33)
at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95)
at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683)
at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:142)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:133)
at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19)
at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeParams(SynchronizedSymbols.scala:132)
at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.typeParams(SynchronizedSymbols.scala:168)
at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:1926)
at scala.reflect.internal.Types$NoArgsTypeRef.isHigherKinded(Types.scala:1925)
at scala.reflect.internal.transform.UnCurry$class.scala$reflect$internal$transform$UnCurry$$expandAlias(UnCurry.scala:22)
at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:26)
at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:24)
at scala.collection.immutable.List.loop$1(List.scala:173)
at scala.collection.immutable.List.mapConserve(List.scala:189)
at scala.reflect.internal.tpe.TypeMaps$TypeMap.mapOver(TypeMaps.scala:115)
at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:46)
at scala.reflect.internal.transform.Transforms$class.transformedType(Transforms.scala:43)
at scala.reflect.internal.SymbolTable.transformedType(SymbolTable.scala:16)
at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:225)
at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:218)
at org.apache.spark.sql.catalyst.ScalaReflection$class.getClassNameFromType(ScalaReflection.scala:853)
at org.apache.spark.sql.catalyst.ScalaReflection$.getClassNameFromType(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:78)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor$1.apply(ScalaReflection.scala:65)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:62)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:63)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
at com.aaa.TestHbase$.main(TestHbase.scala:40)
at com.aaa.TestHbase.main(TestHbase.scala)
這個錯誤的意思是非法的迴圈引用
,
目前我沒搞明白我迴圈引用了啥,不過大概摸清了出現異常的情況。
異常出現的程式碼塊:
val result1 : RDD[(ImmutableBytesWritable, Seq[KeyValue])] = TM_ODR_BANK_STAT_D
.map(row => {
var kvlist: Seq[KeyValue] = List()
var kv: KeyValue = null
val cf: Array[Byte] = clounmFamily.getBytes //列族
val rowKey = Bytes.toBytes(row.getAs[Int]("ID"))
val immutableRowKey = new ImmutableBytesWritable(rowKey)
for (i <- 1 to (columnsName.length - 1)) {
// 將rdd轉換成HFile需要的格式,
// 我們上面定義了Hfile的key是ImmutableBytesWritable,
// 那麼我們定義的RDD也是要以ImmutableBytesWritable的例項為key
kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.get(i) + ""))
// 將新的kv加在kvlist後面(不能反 需要整體有序)
kvlist = kvlist :+ kv
}
//(rowKey, kvlist.length)-----1
//(rowKey, kvlist)-----2
//(immutableRowKey, kvlist.length)-----3
//(immutableRowKey, kvlist)-----4
})
如上面的程式碼所示:
如果最後的返回值是2、3、4中的一個,那麼就會報這個非法迴圈引用的錯誤
,
他們的共同點是都是物件(雖然scala萬物皆可物件,但是還是沒搞懂);
如果返回的是1則沒有問題,但是這並不是我們要的答案。
網上一堆說scala版本問題,JDK版本問題,廣播變數等都沒有解決,只能自己慢慢搗鼓。
通過觀察資料型別發現TM_ODR_BANK_STAT_D是DataFrame/Dataset[Row]
,
進行map操作後還是DataFrame/Dataset[Row]
,但是編譯期間沒有報錯;
有可能因為是DataFrame/Dataset[Row]
的 map
操作有我不知道的限制,所以果斷DataFrame/Dataset[Row]
轉RDD試試。
嗯......~,還真的給我試出來了,運氣成分,我現在也不知道啥原因,也許是天選之子吧。
關於轉換的才做可以參考我的部落格Spark 讀寫資料、抽象轉換 拾遺
修改後的程式碼(未優化):
val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = TM_ODR_BANK_STAT_D
.rdd //轉換rdd
.map(row => {
var kvlist: Seq[KeyValue] = List()
var kv: KeyValue = null
val cf: Array[Byte] = clounmFamily.getBytes //列族
val rowKey = Bytes.toBytes(row.getAs[Int]("ID"))
val immutableRowKey = new ImmutableBytesWritable(rowKey)
for (i <- 1 to (columnsName.length - 1)) {
kv = new KeyValue(rowKey, cf, Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.get(i) + ""))
kvlist = kvlist :+ kv
}
(immutableRowKey, kvlist)
})
key排序
Added a key not lexically larger than previous
Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = \x00\x00\xE4h/cf:CNSM_CNT_TAG/1568255140650/Put/vlen=3/seqid=0, lastCell = \x00\x00\xE4h/cf:FIRST_ACTV_DT/1568255140650/Put/vlen=6/seqid=0
at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.checkKey(HFileWriterImpl.java:245)
at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.append(HFileWriterImpl.java:731)
at org.apache.hadoop.hbase.regionserver.StoreFileWriter.append(StoreFileWriter.java:234)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:344)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:231)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:356)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
Hbase查詢是根據rowkey進行查詢的,並且rowkey是有序,某種程度上來說rowkey就是一個索引,這是Hbase查詢高效的一個原因。
一開始程式碼中只是對key排序,在版本的時候測試沒問題,但是2.0.2出問題了。
此處報錯的意思是當前列CNSM_CNT_TAG
比 上一列FIRST_ACTV_DT
小,
猜測同一個key下clounm也需要有序,
於是對key,clounm排序解決了這個問題。
(之前的部落格中應該是因為一開始對列排了序 所以沒出問題)。
解決方法:
hfileRDD
.sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序
.saveAsNewAPIHadoopFile(savePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
HBase 根目錄不存在
java.util.concurrent.ExecutionException: org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.hadoop.hbase.client.ConnectionImplementation.retrieveClusterId(ConnectionImplementation.java:549)
at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:287)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at com.aaa.TestHbase$.main(TestHbase.scala:99)
at com.aaa.TestHbase.main(TestHbase.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
預設為:/hbase
如果修改了需要指定,否則找不到該路徑
修改方式有兩個:
- 修改配置檔案
bhase-site.xml
<configuration>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
</configuration>
- 程式碼中設定引數
程式碼中執行要使用此方法
conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase 根目錄設定
conf.set("zookeeper.znode.parent", "/hbase") //設定成真實的值
一個family下超過了預設的32個hfile
Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:288)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:842)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:847)
解決辦法有兩個:
- 修改配置檔案
bhase-site.xml
<property>
<name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
<value>400</value>
</property>
- 程式碼中設定引數
程式碼中執行要使用此方法
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")
記憶體溢位
19/09/17 15:25:17 ERROR YarnScheduler:
Lost executor 8 on slave2: Container killed by YARN for exceeding memory limits.
11.0 GB of 11 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
excutor 記憶體不夠,這個就要根據自己的程式碼進行調整了,
加大記憶體總量不一定有用,也不存在萬能的方法,但是可以根據下面的思路去嘗試。
- 將
spark.yarn.executor.memoryOverhead
設定為最大值,可以考慮一下4096。這個數值一般都是2的次冪。 - 加大rdd、DataFrame分割槽,像我repartition(200),前提是資料是均勻分佈的,否則可能會出現資料傾斜。
- 減少將
spark.executor.core
如:從8設定為4。將core的個數調小。 - 增加將
spark.executor.memory
如:從8g設定為12g。將記憶體調大。
- spark.yarn.executor.memoryOverhead計算方式
E = max(MEMORY_OVERHEAD_MIN,MEMORY_OVERHEAD_FACTOR*executorMemory)
MEMORY_OVERHEAD_FACTOR預設為0.1;
executorMemory為設定的executor-memory;
MEMORY_OVERHEAD_MIN預設為384m;
引數MEMORY_OVERHEAD_FACTOR和MEMORY_OVERHEAD_MIN一般不能直接修改,是Spark程式碼中直接寫死的
- executor可用記憶體的計算方式:
E = (driver-memory+spark.yarn.executor.memoryOverhead)
本文為原創文章,轉載請註明出處!!