1. 程式人生 > >Kafka->SparkStreaming->Hbase【二】

Kafka->SparkStreaming->Hbase【二】

  根據業務需求,將Kafka中資料抽取插入到Hbase中。目前網上可以找到許多相關的文章,這裡介紹Github上的一個開源工具。

  上一章節講到選擇SparkOnHbase為主要原型,將之修改為我們需要的原始碼。這裡給出修改之後的原始碼,修改之後符合我們的業務需求,並儘量避免引起其他不必要的問題。同時,後期優化程式執行效率問題。

原始碼

class HBaseContext(
  @transient sc:        SparkContext,
  @transient config:    Configuration,
  metas:                java.util.HashMap[String, java.util.HashMap[String, java.util.HashMap[String, ColumnInfo]]],
  val tmpHdfsConfgFile: String                                                                                      = null)
extends Serializable with Logging {
@transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() @transient var tmpHdfsConfiguration: Configuration = config @transient var appliedCredentials = false; @transient var metasLocal = metas @transient val job = new Job(config) TableMapReduceUtil.initCredentials(job) val
broadcastedConf = sc.broadcast(new SerializableWritable(config)) val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials())) val broadcastMetas = sc.broadcast(metas) if (tmpHdfsConfgFile != null && config != null) { val fs = FileSystem.newInstance(config) val
tmpPath = new Path(tmpHdfsConfgFile) if (!fs.exists(tmpPath)) { val outputStream = fs.create(tmpPath) config.write(outputStream) outputStream.close(); } else { logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") } } def mapPartition[T, R: ClassTag]( rdd: RDD[T], mp: (Iterator[T], HConnection) => Iterator[R]): RDD[R] = { rdd.mapPartitions[R](it => hbaseMapPartition[T, R]( broadcastedConf, it, mp), true) } def applyCreds[T](configBroadcast: Broadcast[SerializableWritable[Configuration]]) { credentials = SparkHadoopUtil.get.getCurrentUserCredentials() logInfo("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials); if (appliedCredentials == false && credentials != null) { appliedCredentials = true logCredInformation(credentials) @transient val ugi = UserGroupInformation.getCurrentUser(); ugi.addCredentials(credentials) ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) ugi.addCredentials(credentialsConf.value.value) } } def logCredInformation[T](credentials2: Credentials) { logInfo("credentials:" + credentials2); for (a <- 0 until credentials2.getAllSecretKeys.size()) { logInfo("getAllSecretKeys:" + a + ":" + credentials2.getAllSecretKeys.get(a)); } val it = credentials2.getAllTokens.iterator(); while (it.hasNext) { logInfo("getAllTokens:" + it.next()); } } def bulkMutation[T](rdd: RDD[T], fun: (T) => (DataEntity), autoFlush: Boolean) { rdd.foreachPartition( it => { hbaseForeachPartition[T]( broadcastedConf, broadcastMetas, it, (iter, hConnection, metas) => { iter.foreach(item => { val entity = fun(item) val dbName = entity.dbName val tabName = entity.tabName if (metas.containsKey(dbName) && metas.get(dbName).containsKey(tabName)) { val htable = hConnection.getTable(entity.dbName + ":" + entity.tabName) htable.setAutoFlush(autoFlush, true) entity.`type` match { case "INSERT" | "insert" => { val insertPuts = Instance.insert(entity, metas) if (null != insertPuts && insertPuts.size() > 0) htable.batch(insertPuts) } case "UPDATE" | "update" => { val updatePuts = Instance.update(entity, metas) if (null != updatePuts && updatePuts.size() > 0) htable.batch(updatePuts) } case "DELETE" | "delete" => { val deleteDels = Instance.delete(entity) if (null != deleteDels && deleteDels.size() > 0) htable.batch(deleteDels) } case all: Any => { logInfo("其他操作:" + all) } } htable.flushCommits() htable.close() } }) }) }) } def hbaseRDD[U: ClassTag](tableName: String, scan: Scan, f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { var job: Job = new Job(getConf(broadcastedConf)) TableMapReduceUtil.initCredentials(job) TableMapReduceUtil.initTableMapperJob(tableName, scan, classOf[IdentityTableMapper], null, null, job) sc.newAPIHadoopRDD( job.getConfiguration(), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map(f) } def hbaseRDD(tableName: String, scans: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = { hbaseRDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])]( tableName, scans, (r: (ImmutableBytesWritable, Result)) => { val it = r._2.list().iterator() val list = new ArrayList[(Array[Byte], Array[Byte], Array[Byte])]() while (it.hasNext()) { val kv = it.next() list.add((kv.getFamily(), kv.getQualifier(), kv.getValue())) } (r._1.copyBytes(), list) }) } private def hbaseForeachPartition[T]( configBroadcast: Broadcast[SerializableWritable[Configuration]], metasBroadcast: Broadcast[HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]], it: Iterator[T], fun: (Iterator[T], HConnection, HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]) => Unit) = { val config = getConf(configBroadcast) val metas = getMetas(metasBroadcast) applyCreds(configBroadcast) val hConnection = HConnectionManager.createConnection(config) fun(it, hConnection, metas) hConnection.close() } /** * @desc get METAS from broadcast or driver's configure */ private def getMetas(metasBroadcast: Broadcast[HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]]]): HashMap[String, HashMap[String, HashMap[String, ColumnInfo]]] = { if (null != metasLocal) { return metasLocal } else { try { metasLocal = metasBroadcast.value metasLocal } catch { case ex: Exception => { logInfo("Unable to getConfig from broadcast") } } } metasLocal } private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = { if (tmpHdfsConfiguration != null) { tmpHdfsConfiguration } else if (tmpHdfsConfgFile != null) { val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) val inputStream = fs.open(new Path(tmpHdfsConfgFile)) tmpHdfsConfiguration = new Configuration(false) tmpHdfsConfiguration.readFields(inputStream) inputStream.close() tmpHdfsConfiguration } if (tmpHdfsConfiguration == null) { try { tmpHdfsConfiguration = configBroadcast.value.value tmpHdfsConfiguration } catch { case ex: Exception => { println("Unable to getConfig from broadcast") } } } tmpHdfsConfiguration } private def hbaseMapPartition[K, U]( configBroadcast: Broadcast[SerializableWritable[Configuration]], it: Iterator[K], mp: (Iterator[K], HConnection) => Iterator[U]): Iterator[U] = { val config = getConf(configBroadcast) applyCreds(configBroadcast) val hConnection = HConnectionManager.createConnection(config) val res = mp(it, hConnection) hConnection.close() res } private class GetMapPartition[T, U]( tableName: String, batchSize: Integer, makeGet: (T) => Get, convertResult: (Result) => U) extends Serializable { def run(iterator: Iterator[T], hConnection: HConnection): Iterator[U] = { val htable = hConnection.getTable(tableName) val gets = new ArrayList[Get]() var res = List[U]() while (iterator.hasNext) { gets.add(makeGet(iterator.next)) if (gets.size() == batchSize) { var results = htable.get(gets) res = res ++ results.map(convertResult) gets.clear() } } if (gets.size() > 0) { val results = htable.get(gets) res = res ++ results.map(convertResult) gets.clear() } htable.close() res.iterator } } def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] }

  根據我們的需求,重構了HbaseContext的原始碼,刪除了不必要的程式程式碼,從源頭上保證了程式適用於我們的應用場景。

SparkSteaming程式碼


    /** initialize ZK UTIL */
    @transient val zkUtil = new CuratorUtil()

    /** get initialize parameters */
    val offsetPath = PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_SPARK_PATH)
    zkUtil.createZKNodePer(offsetPath, null)

    val topic = PropertiesUtil.getProperty(ConstantUtil.KAFKA_TOPIC_NAME)
    val recTime = Integer.parseInt(PropertiesUtil.getProperty(ConstantUtil.STREAMING_RECTCKE_TIME))
    val ZK_MYSQL_PATH = PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_NAMESPACE_MYSQL_TABLES);
    val brokerList = PropertiesUtil.getProperty(ConstantUtil.KAFKA_BROKER_LIST);

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokerList,
      "zookeeper.connect" -> PropertiesUtil.getProperty(ConstantUtil.ZOOKEEPER_SERVER_LIST),
      "group.id" -> PropertiesUtil.getProperty(ConstantUtil.KAFKA_CONSUMER_GROUPID))

    /** initialize HBASE METAS for filter */
    @transient @volatile var metas: java.util.HashMap[String, java.util.HashMap[String, java.util.HashMap[String, ColumnInfo]]] = Instance.paserMetas(zkUtil, ZK_MYSQL_PATH)
    if (metas.size() < 1) {
      println("load hbase tablem metas failed!")
      return ;
    }

    /**  initialize Context */
    // configure
    @transient val sparkConf = new SparkConf()
      .set("spark.streaming.backpressure.enabled", PropertiesUtil.getProperty(ConstantUtil.STREAMING_BACK_ENABLED)) // 設定可以限制
      .set("spark.streaming.kafka.maxRatePerPartition", PropertiesUtil.getProperty(ConstantUtil.STREAMING_KAFKA_MAXRATE)) // 設定具體限制數量:records/SEC
      .set("spark.streaming.stopGracefullyOnShutdown", PropertiesUtil.getProperty(ConstantUtil.STREAMING_SHUTDOWN_GRACEFULLLY)) // 設定Gracefully stop
      .set("serializer.class", "kafka.serializer.StringEncoder")
    @transient val hbaseConf = HBaseConfiguration.create();
    hbaseConf.addResource("/etc/hbase/conf.cloudera.hbase/hbase-site.xml")
    hbaseConf.addResource("/etc/hbase/conf.cloudera.hbase/core-site.xml")
    @transient val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(recTime));

    val fromOffsets = readOffsetData(zkUtil, offsetPath, topic, brokerList, 9092)
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))

    stream.foreachRDD(rdd => {

      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges.map { offset => (offset.partition, offset.fromOffset) }
      writeOffsetData(zkUtil, offsetPath, offsets)

      val hbaseContext = new HBaseContext(sc, hbaseConf, metas)
      hbaseContext.bulkMutation(rdd.map(item => item._2), (KV: String) => {
        Instance.parse(KV)
      }, false)

    })

    /** add gracefully stop control */
    Runtime.getRuntime.addShutdownHook(new Thread {
      override def run(): Unit = {
        try {
          zkUtil.close()
        } catch {
          case e: Exception => {
          }
        }
        ssc.stop(true, true)
      }
    })

    /** spark streaming start and wait termination */
    ssc.start()
    ssc.awaitTermination()

  }

 /**
   * @desc read data from Zookeeper
   */
  def readOffsetData(zkUtil: CuratorUtil, offsetPath: String, topic: String, brokerList: String, kafkaPort: Integer): Map[TopicAndPartition, Long] = {

    val orgData = zkUtil.readDataForPath(offsetPath)
    if (null == orgData) {
      val util = KafkaUtil.getInstance();
      util.init(brokerList, kafkaPort, topic);
      val offsets = util.getLeastOffsets
      val fromOffsets = for (i <- 0 to offsets.size() - 1)
        yield TopicAndPartition.apply(topic, i) -> offsets.get(i).toLong
      return fromOffsets.toMap
    }

    val data = JSON.parseFull(orgData).get.asInstanceOf[Map[String, String]]
    val fromOffsets = data.map(item => {
      TopicAndPartition.apply(topic, item._1.toInt) -> item._2.toLong
    })
    return fromOffsets

  }

  /**
   * @desc write offset data to Zookeeper
   */
  def writeOffsetData(zkUtil: CuratorUtil, offsetPath: String, data: Array[(Int, Long)]): Unit = {

    val map = data.toMap[Int, Long].map(item => {
      item._1.toString() -> item._2.toString()
    })
    zkUtil.setDataForPath(offsetPath, JSONObject(map).toString)

  }