spark on yarn模式下掃描帶有kerberos的hbase
我上一篇寫了關於如何在spark中直接訪問有kerberos的hbase,現在我們需要對hbase進行全表的分散式掃描,在無kerberos的情況下通過sparkcontext的newApiHadoopRDD就可以達到目的,但有了kerberos的限制,這個方法就不行了,也許有人會想到通過我之前提到的ugi doAs方法解決,但是分散式掃描的情況下如果將newApiHadoopRDD放到doAs中,只能對當前節點起到作用,無法為其他節點賦予許可權,使用的時候仍然會出現證書無效的問題。
那麼,如何解決此問題呢 ?首先,儘量保證spark執行在yarn環境中,不要用stdandalone方式,接著就是如何許可權問題了,網上給出的比較多的方式是通過token,不過我沒有嘗試成功,cloudera提供了一種解決此問題的方法,就是過載RDD,自己實現一個newApiHadoopRDD。這樣只要將其中訪問hbase的部分放在ugi的doAs中就可以了。
該如何過載呢?RDD的過載並不困難,首先繼承RDD類,接著要實現getPartitions和compute兩個介面。
getPartitions是spark獲取分片資訊的方法。而compute則是用來獲取具體的資料。
下面來看程式碼:
SparkHadoopMapReduceUtil是一個特性類,cloudera對其進行了一次額外繼承import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{ SparkContext, TaskContext } import org.apache.spark.broadcast.Broadcast import org.apache.spark.SerializableWritable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.spark.rdd.RDD import org.apache.spark.Partition import org.apache.spark.InterruptibleIterator import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.mapreduce.Job import org.apache.spark.Logging import org.apache.spark.SparkHadoopMapReduceUtilExtended import org.apache.hadoop.mapreduce.JobID import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.InputSplit import java.text.SimpleDateFormat import java.util.Date import java.util.ArrayList import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.security.UserGroupInformation import java.security.PrivilegedExceptionAction import java.security.PrivilegedAction import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable class HBaseScanRDD(sc: SparkContext, usf: UserSecurityFunction, @transient tableName: String, @transient scan: Scan, configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])](sc, Nil) with SparkHadoopMapReduceUtilExtended with Logging { /// @transient val jobTransient = new Job(configBroadcast.value.value, "ExampleRead"); if(usf.isSecurityEnable()) { usf.login().doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection classOf[IdentityTableMapper], // mapper null, // mapper output key null, // mapper output value jobTransient) }) }else{ TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection classOf[IdentityTableMapper], // mapper null, // mapper output key null, // mapper output value jobTransient) } @transient val jobConfigurationTrans = jobTransient.getConfiguration() jobConfigurationTrans.set(TableInputFormat.INPUT_TABLE, tableName) val jobConfigBroadcast = sc.broadcast(new SerializableWritable(jobConfigurationTrans)) //// private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") formatter.format(new Date()) } @transient protected val jobId = new JobID(jobTrackerId, id) override def getPartitions: Array[Partition] = { //addCreds val tableInputFormat = new TableInputFormat tableInputFormat.setConf(jobConfigBroadcast.value.value) val jobContext = newJobContext(jobConfigBroadcast.value.value, jobId) var rawSplits : Array[Object] = null if(usf.isSecurityEnable()) { rawSplits = usf.login().doAs(new PrivilegedAction[Array[Object]]{ def run: Array[Object] = tableInputFormat.getSplits(jobContext).toArray }) }else{ rawSplits = tableInputFormat.getSplits(jobContext).toArray } val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result } override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = { //addCreds val iter = new Iterator[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] { //addCreds val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) val conf = jobConfigBroadcast.value.value val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = new TableInputFormat format.setConf(conf) var reader : RecordReader[ImmutableBytesWritable, Result] = null if(usf.isSecurityEnable()) { reader = usf.login().doAs(new PrivilegedAction[RecordReader[ImmutableBytesWritable, Result]]{ def run: RecordReader[ImmutableBytesWritable, Result] = { val _reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) _reader } })}else{ reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) } // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false var finished = false override def hasNext: Boolean = { if (!finished && !havePair) { finished = !reader.nextKeyValue havePair = !finished } !finished } override def next(): (Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])]) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false val it = reader.getCurrentValue.listCells().iterator() val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]() while (it.hasNext()) { val kv = it.next() list.add((CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), CellUtil.cloneValue(kv))) } (reader.getCurrentKey.copyBytes(), list) } private def close() { try { reader.close() } catch { case e: Exception => logWarning("Exception in RecordReader.close()", e) } } } new InterruptibleIterator(context, iter) } def addCreds { val creds = SparkHadoopUtil.get.getCurrentUserCredentials() val ugi = UserGroupInformation.getCurrentUser(); ugi.addCredentials(creds) // specify that this is a proxy user ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) } private class NewHadoopPartition( rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) override def hashCode(): Int = 41 * (41 + rddId) + index } }
package org.apache.spark
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
trait SparkHadoopMapReduceUtilExtended extends SparkHadoopMapReduceUtil{
}
注意繼承要標註org.apache.spark包這樣就可以直接使用,new HBaseScanRDD(),傳入SparkContext。
如果想用java訪問,還需要在外面封裝一層。
import org.apache.spark.api.java.JavaSparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.spark.broadcast.Broadcast
import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.SerializableWritable
import org.apache.spark.api.java.JavaPairRDD
class JavaHBaseContext(@transient jsc: JavaSparkContext,usf: UserSecurityFunction) {
def hbaseApiRDD(tableName: String,
scan: Scan,
configBroadcast: Configuration) = {
JavaPairRDD.fromRDD(new HBaseScanRDD(JavaSparkContext.toSparkContext(jsc),usf,tableName,
scan,
jsc.broadcast(new SerializableWritable(configBroadcast))))
}
}
是不是簡單了很多?只需要new JavaHBaseContext(sc,usf).hbaseApiRDD("XXX",scan,hbaseconfig)就可以得到一個與newApiHadoopRDD一樣的RDD了。
相關推薦
spark on yarn模式下掃描帶有kerberos的hbase
我上一篇寫了關於如何在spark中直接訪問有kerberos的hbase,現在我們需要對hbase進行全表的分散式掃描,在無kerberos的情況下通過sparkcontext的newApiHadoopRDD就可以達到目的,但有了kerberos的限制,這個方法就不行了,
spark on yarn模式下內存資源管理(筆記2)
warn 計算 nta 堆內存 註意 layout led -o exc 1.spark 2.2內存占用計算公式 https://blog.csdn.net/lingbo229/article/details/80914283 2.spark on yarn內存分配*
大資料之Spark(八)--- Spark閉包處理,Spark的應用的部署模式,Spark叢集的模式,啟動Spark On Yarn模式,Spark的高可用配置
一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,fun
On Yarn模式下 Spring Flo for SpringXD UI部署
配置Spring Flo可在SpringXD的admin-ui頁面提供拖拽式的組合job的定義。 解壓flo-spring-xd-admin-ui-client-1.3.1.RELEASE.zip unzip flo-spring-x
超圖大資料產品spark on yarn模式使用
本文介紹超圖大資料產品spark元件,iServer產品中的分散式分析服務,如何在部署好的spark叢集,hadoop叢集中採用spark on yarn模式提交任務進行空間大資料相關的分析。 一、環境 1. Ubuntu server 16,三個節點的hadoop叢集和spar
Spark on yarn模式的引數設定即調優
1 啟動方式 執行命令./spark-shell --master yarn預設執行的是client模式。 執行./spark-shell --master yarn-client或者./spark-shell --master yarn --deploy-m
Spark on YARN模式的安裝(spark-1.6.1-bin-hadoop2.6.tgz + hadoop-2.6.0.tar.gz)(master、slave1和slave2)(博主推薦)
說白了 Spark on YARN模式的安裝,它是非常的簡單,只需要下載編譯好Spark安裝包,在一臺帶有Hadoop YARN客戶端的的機器上執行即可。 Spark on YARN分為兩種: YARN cluster(YARN standalone,0.9版本以前)和 YA
spark on yarn模式裡需要有時手工釋放linux記憶體
歡迎您的加入! 微信公眾號平臺: 大資料躺過的坑 微信公眾號平臺: 人工智慧躺過的坑 大資料和人工智慧躺過的坑(總群): 161156071 更多QQ技術分群,詳情請見:http://www.cnblogs.com/zls
Spark on yarn的兩種模式 yarn-cluster 和 yarn-client
然而 技術 負責 blog 作業 mage 申請 .com contain 從深層次的含義講,yarn-cluster和yarn-client模式的區別其實就是Application Master進程的區別,yarn-cluster模式下,driver運行在AM(Appli
kerberos體系下的應用(yarn,spark on yarn)
kerberos 介紹 閱讀本文之前建議先預讀下面這篇部落格kerberos認證原理---講的非常細緻,易懂 Kerberos實際上一個基於Ticket的認證方式。Client想要獲取Server端的資源,先得通過Server的認證;而認證的先決條件是Client向Server
Spark的分散式執行模式 Local,Standalone, Spark on Mesos, Spark on Yarn, Kubernetes
Spark的分散式執行模式 Local,Standalone, Spark on Mesos, Spark on Yarn, Kubernetes Local模式 Standalone模式 Spark on Mesos模式 Spark on Yarn
一 spark on yarn cluster模式提交作業,一直處於ACCEPTED狀態,改了Client模式後就正常了
1. 提交spark作業到yarn,採用client模式的時候作業可以執行,但是採用cluster模式的時候作業會一直初一accept狀態。 背景:這個測試環境的資源比較小,提交作業後一直處於accept狀態,所以把作業的配置也設定的小。 submit 語句: spark
Spark on YARN cluster & client 模式作業執行全過程分析
原文連結列表如下,致謝: https://www.iteblog.com/archives/1223.html https://www.iteblog.com/archives/1189.html https://www.iteblog.com/archives/1191.html
Spark on YARN client模式作業執行全過程分析
在前篇文章中我介紹了Spark on YARN叢集模式(yarn-cluster)作業從提交到執行整個過程的情況(詳情見《Spark on YARN叢集模式作業執行全過程分析》),我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是
Spark on Yarn Client和Cluster模式詳解
Spark在YARN中有yarn-cluster和yarn-client兩種執行模式: I. Yarn Cluster Spark Driver首先作為一個ApplicationMaster在YARN叢集中啟動,客戶端提交給ResourceManager的每一個job都
自己的HADOOP平臺(三):Mysql+hive遠端模式+Spark on Yarn
Spark和hive配置較為簡單,為了方便Spark對資料的使用與測試,因此在搭建Spark on Yarn模式的同時,也把Mysql + Hive一起搭建完成,並且配置Hive對Spark的支援,讓Spark也能像Hive一樣操作資料。 前期準備
Spark on Yarn遇到的幾個問題
添加 shuffle tasks pil 生產 當前 lis file 被拒 1 概述 Spark的on Yarn模式。其資源分配是交給Yarn的ResourceManager來進行管理的。可是眼下的Spark版本號,Application日誌的查看,僅僅
spark on yarn詳解
.sh 提交 cut com blog sta clu ... client模式 1、參考文檔: spark-1.3.0:http://spark.apache.org/docs/1.3.0/running-on-yarn.html spark-1.6.0:http://s
Spark記錄-Spark on Yarn框架
ive 變量 進程 app shuf backend 性能 操作 spi 一、客戶端進行操作 1、根據yarnConf來初始化yarnClient,並啟動yarnClient2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是
【Spark】篇---Spark中yarn模式兩種提交任務方式
方式 div -s and clas client 命令 yarn 模式 一、前述 Spark可以和Yarn整合,將Application提交到Yarn上運行,和StandAlone提交模式一樣,Yarn也有兩種提交任務的方式。 二、具體 1、yarn