1. 程式人生 > >spark on yarn模式下掃描帶有kerberos的hbase

spark on yarn模式下掃描帶有kerberos的hbase

我上一篇寫了關於如何在spark中直接訪問有kerberos的hbase,現在我們需要對hbase進行全表的分散式掃描,在無kerberos的情況下通過sparkcontext的newApiHadoopRDD就可以達到目的,但有了kerberos的限制,這個方法就不行了,也許有人會想到通過我之前提到的ugi doAs方法解決,但是分散式掃描的情況下如果將newApiHadoopRDD放到doAs中,只能對當前節點起到作用,無法為其他節點賦予許可權,使用的時候仍然會出現證書無效的問題。

那麼,如何解決此問題呢 ?首先,儘量保證spark執行在yarn環境中,不要用stdandalone方式,接著就是如何許可權問題了,網上給出的比較多的方式是通過token,不過我沒有嘗試成功,cloudera提供了一種解決此問題的方法,就是過載RDD,自己實現一個newApiHadoopRDD。這樣只要將其中訪問hbase的部分放在ugi的doAs中就可以了。

該如何過載呢?RDD的過載並不困難,首先繼承RDD類,接著要實現getPartitions和compute兩個介面。

getPartitions是spark獲取分片資訊的方法。而compute則是用來獲取具體的資料。

下面來看程式碼:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{ SparkContext, TaskContext }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.spark.rdd.RDD
import org.apache.spark.Partition
import org.apache.spark.InterruptibleIterator
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.Logging
import org.apache.spark.SparkHadoopMapReduceUtilExtended
import org.apache.hadoop.mapreduce.JobID
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.InputSplit
import java.text.SimpleDateFormat
import java.util.Date
import java.util.ArrayList
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.hbase.mapreduce.IdentityTableMapper
import org.apache.hadoop.hbase.CellUtil
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import java.security.PrivilegedAction
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable


class HBaseScanRDD(sc: SparkContext,
                   usf: UserSecurityFunction,
                   @transient tableName: String,
                   @transient scan: Scan,
                   configBroadcast: Broadcast[SerializableWritable[Configuration]])
  extends RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])](sc, Nil)
  with SparkHadoopMapReduceUtilExtended
  with Logging {

  ///
  @transient val jobTransient = new Job(configBroadcast.value.value, "ExampleRead");
  if(usf.isSecurityEnable())
  {
    usf.login().doAs(new PrivilegedExceptionAction[Unit] {
      def run: Unit = 
          TableMapReduceUtil.initTableMapperJob(
    tableName, // input HBase table name
    scan, // Scan instance to control CF and attribute selection
    classOf[IdentityTableMapper], // mapper
    null, // mapper output key
    null, // mapper output value
    jobTransient)
    })
  }else{
    TableMapReduceUtil.initTableMapperJob(
    tableName, // input HBase table name
    scan, // Scan instance to control CF and attribute selection
    classOf[IdentityTableMapper], // mapper
    null, // mapper output key
    null, // mapper output value
    jobTransient)
  }
  
  

  @transient val jobConfigurationTrans = jobTransient.getConfiguration()
  jobConfigurationTrans.set(TableInputFormat.INPUT_TABLE, tableName)
  val jobConfigBroadcast = sc.broadcast(new SerializableWritable(jobConfigurationTrans))
  ////

  private val jobTrackerId: String = {
    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
    formatter.format(new Date())
  }

  @transient protected val jobId = new JobID(jobTrackerId, id)

  override def getPartitions: Array[Partition] = {

    //addCreds

    val tableInputFormat = new TableInputFormat
    tableInputFormat.setConf(jobConfigBroadcast.value.value)

    val jobContext = newJobContext(jobConfigBroadcast.value.value, jobId)
    var rawSplits : Array[Object] = null
    if(usf.isSecurityEnable())
    {
      rawSplits = usf.login().doAs(new PrivilegedAction[Array[Object]]{
      def run: Array[Object] = tableInputFormat.getSplits(jobContext).toArray
    })
    }else{
      rawSplits = tableInputFormat.getSplits(jobContext).toArray
    }
      
    
    val result = new Array[Partition](rawSplits.size)
    for (i <- 0 until rawSplits.size) {
      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    }

    result
  }

  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {

    //addCreds

    val iter = new Iterator[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] {

      //addCreds

      val split = theSplit.asInstanceOf[NewHadoopPartition]
      logInfo("Input split: " + split.serializableHadoopSplit)
      val conf = jobConfigBroadcast.value.value

      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
      val format = new TableInputFormat
      format.setConf(conf)

      var reader : RecordReader[ImmutableBytesWritable, Result] = null
      if(usf.isSecurityEnable())
      {
      reader = usf.login().doAs(new PrivilegedAction[RecordReader[ImmutableBytesWritable, Result]]{
        def run: RecordReader[ImmutableBytesWritable, Result] = {
         val _reader = format.createRecordReader(
        split.serializableHadoopSplit.value, hadoopAttemptContext)
      _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
      _reader
        }
      })}else{
        reader = format.createRecordReader(
        split.serializableHadoopSplit.value, hadoopAttemptContext)
        reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
      }
       

      // Register an on-task-completion callback to close the input stream.
      context.addOnCompleteCallback(() => close())
      var havePair = false
      var finished = false

      override def hasNext: Boolean = {
        if (!finished && !havePair) {
          finished = !reader.nextKeyValue
          havePair = !finished
        }
        !finished
      }

      override def next(): (Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])]) = {
        if (!hasNext) {
          throw new java.util.NoSuchElementException("End of stream")
        }
        havePair = false

        val it = reader.getCurrentValue.listCells().iterator()

        val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]()

        while (it.hasNext()) {
          val kv = it.next()
          list.add((CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), CellUtil.cloneValue(kv)))
        }
        (reader.getCurrentKey.copyBytes(), list)
      }

      private def close() {
        try {
          reader.close()
        } catch {
          case e: Exception => logWarning("Exception in RecordReader.close()", e)
        }
      }
    }
    new InterruptibleIterator(context, iter)
  }

  def addCreds {
    val creds = SparkHadoopUtil.get.getCurrentUserCredentials()

    val ugi = UserGroupInformation.getCurrentUser();
    ugi.addCredentials(creds)
    // specify that this is a proxy user 
    ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
  }

  private class NewHadoopPartition(
                                           rddId: Int,
                                           val index: Int,
                                           @transient rawSplit: InputSplit with Writable)
    extends Partition {

    val serializableHadoopSplit = new SerializableWritable(rawSplit)

    override def hashCode(): Int = 41 * (41 + rddId) + index
  }
}
SparkHadoopMapReduceUtil是一個特性類,cloudera對其進行了一次額外繼承
package org.apache.spark

import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil

trait SparkHadoopMapReduceUtilExtended extends SparkHadoopMapReduceUtil{
  
}
注意繼承要標註org.apache.spark包

這樣就可以直接使用,new HBaseScanRDD(),傳入SparkContext。

如果想用java訪問,還需要在外面封裝一層。

import org.apache.spark.api.java.JavaSparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.spark.broadcast.Broadcast
import org.apache.hadoop.hbase.client.Scan
import org.apache.spark.SerializableWritable
import org.apache.spark.api.java.JavaPairRDD

class JavaHBaseContext(@transient jsc: JavaSparkContext,usf: UserSecurityFunction) {
  
  def hbaseApiRDD(tableName: String,
      scan: Scan,
      configBroadcast: Configuration) = {
      JavaPairRDD.fromRDD(new HBaseScanRDD(JavaSparkContext.toSparkContext(jsc),usf,tableName,
        scan,
        jsc.broadcast(new SerializableWritable(configBroadcast))))
  }
  
}
是不是簡單了很多?只需要new JavaHBaseContext(sc,usf).hbaseApiRDD("XXX",scan,hbaseconfig)就可以得到一個與newApiHadoopRDD一樣的RDD了。

相關推薦

spark on yarn模式掃描帶有kerberos的hbase

我上一篇寫了關於如何在spark中直接訪問有kerberos的hbase,現在我們需要對hbase進行全表的分散式掃描,在無kerberos的情況下通過sparkcontext的newApiHadoopRDD就可以達到目的,但有了kerberos的限制,這個方法就不行了,

spark on yarn模式內存資源管理(筆記2)

warn 計算 nta 堆內存 註意 layout led -o exc 1.spark 2.2內存占用計算公式 https://blog.csdn.net/lingbo229/article/details/80914283 2.spark on yarn內存分配*

大資料之Spark(八)--- Spark閉包處理,Spark的應用的部署模式Spark叢集的模式,啟動Spark On Yarn模式Spark的高可用配置

一、Spark閉包處理 ------------------------------------------------------------ RDD,resilient distributed dataset,彈性(容錯)分散式資料集。 分割槽列表,fun

On Yarn模式 Spring Flo for SpringXD UI部署

配置Spring Flo可在SpringXD的admin-ui頁面提供拖拽式的組合job的定義。 解壓flo-spring-xd-admin-ui-client-1.3.1.RELEASE.zip unzip flo-spring-x

超圖大資料產品spark on yarn模式使用

本文介紹超圖大資料產品spark元件,iServer產品中的分散式分析服務,如何在部署好的spark叢集,hadoop叢集中採用spark on yarn模式提交任務進行空間大資料相關的分析。 一、環境 1. Ubuntu server 16,三個節點的hadoop叢集和spar

Spark on yarn模式的引數設定即調優

1 啟動方式 執行命令./spark-shell --master yarn預設執行的是client模式。 執行./spark-shell --master yarn-client或者./spark-shell --master yarn --deploy-m

Spark on YARN模式的安裝(spark-1.6.1-bin-hadoop2.6.tgz + hadoop-2.6.0.tar.gz)(master、slave1和slave2)(博主推薦)

說白了   Spark on YARN模式的安裝,它是非常的簡單,只需要下載編譯好Spark安裝包,在一臺帶有Hadoop YARN客戶端的的機器上執行即可。    Spark on YARN分為兩種: YARN cluster(YARN standalone,0.9版本以前)和 YA

spark on yarn模式裡需要有時手工釋放linux記憶體

歡迎您的加入! 微信公眾號平臺: 大資料躺過的坑 微信公眾號平臺: 人工智慧躺過的坑 大資料和人工智慧躺過的坑(總群): 161156071 更多QQ技術分群,詳情請見:http://www.cnblogs.com/zls

Spark on yarn的兩種模式 yarn-cluster 和 yarn-client

然而 技術 負責 blog 作業 mage 申請 .com contain 從深層次的含義講,yarn-cluster和yarn-client模式的區別其實就是Application Master進程的區別,yarn-cluster模式下,driver運行在AM(Appli

kerberos體系的應用(yarn,spark on yarn)

kerberos 介紹 閱讀本文之前建議先預讀下面這篇部落格kerberos認證原理---講的非常細緻,易懂 Kerberos實際上一個基於Ticket的認證方式。Client想要獲取Server端的資源,先得通過Server的認證;而認證的先決條件是Client向Server

Spark的分散式執行模式 Local,Standalone, Spark on Mesos, Spark on Yarn, Kubernetes

Spark的分散式執行模式 Local,Standalone, Spark on Mesos, Spark on Yarn, Kubernetes Local模式 Standalone模式 Spark on Mesos模式 Spark on Yarn

spark on yarn cluster模式提交作業,一直處於ACCEPTED狀態,改了Client模式後就正常了

1. 提交spark作業到yarn,採用client模式的時候作業可以執行,但是採用cluster模式的時候作業會一直初一accept狀態。 背景:這個測試環境的資源比較小,提交作業後一直處於accept狀態,所以把作業的配置也設定的小。 submit 語句: spark

Spark on YARN cluster & client 模式作業執行全過程分析

原文連結列表如下,致謝: https://www.iteblog.com/archives/1223.html https://www.iteblog.com/archives/1189.html https://www.iteblog.com/archives/1191.html

Spark on YARN client模式作業執行全過程分析

      在前篇文章中我介紹了Spark on YARN叢集模式(yarn-cluster)作業從提交到執行整個過程的情況(詳情見《Spark on YARN叢集模式作業執行全過程分析》),我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是

Spark on Yarn Client和Cluster模式詳解

Spark在YARN中有yarn-cluster和yarn-client兩種執行模式: I. Yarn Cluster Spark Driver首先作為一個ApplicationMaster在YARN叢集中啟動,客戶端提交給ResourceManager的每一個job都

自己的HADOOP平臺(三):Mysql+hive遠端模式+Spark on Yarn

Spark和hive配置較為簡單,為了方便Spark對資料的使用與測試,因此在搭建Spark on Yarn模式的同時,也把Mysql + Hive一起搭建完成,並且配置Hive對Spark的支援,讓Spark也能像Hive一樣操作資料。 前期準備

Spark on Yarn遇到的幾個問題

添加 shuffle tasks pil 生產 當前 lis file 被拒 1 概述 Spark的on Yarn模式。其資源分配是交給Yarn的ResourceManager來進行管理的。可是眼下的Spark版本號,Application日誌的查看,僅僅

spark on yarn詳解

.sh 提交 cut com blog sta clu ... client模式 1、參考文檔: spark-1.3.0:http://spark.apache.org/docs/1.3.0/running-on-yarn.html spark-1.6.0:http://s

Spark記錄-Spark on Yarn框架

ive 變量 進程 app shuf backend 性能 操作 spi 一、客戶端進行操作 1、根據yarnConf來初始化yarnClient,並啟動yarnClient2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是

Spark】篇---Sparkyarn模式兩種提交任務方式

方式 div -s and clas client 命令 yarn 模式 一、前述 Spark可以和Yarn整合,將Application提交到Yarn上運行,和StandAlone提交模式一樣,Yarn也有兩種提交任務的方式。 二、具體 1、yarn