1. 程式人生 > >Spark Hive 匯入資料到 HBase

Spark Hive 匯入資料到 HBase

導讀:
         如何使用MapReduce將textfile、rcfile格式的Hive表資料匯入到HBase,上一篇部落格中已經做了介紹 MapReduce Hive 匯入資料到 HBase,MR這種方式有兩個缺點,一是當資料量特別大的時候,執行較慢,hive中表的儲存格式有多種,除了上面兩種,還有常用的兩種就是ORC和Parquet,這兩種壓縮比和查詢效率都很高,當hive表中儲存格式發生變化後,可能就得需要修改程式碼重新打包。可以使用Spark來完成hive表資料匯入HBase,執行速度快且不受hive表儲存格式影響,一石兩鳥,何樂而不為,說幹就幹,筆者寫了一個Spark 程式如下。

工程構建筆者就不再介紹了,和上篇部落格中一樣,不同的是再建立完Maven專案後需要引入Scala,專案右鍵選擇Add Framework Support 然後彈出頁面中勾選Scala即可。
在這裡插入圖片描述

程式碼如下

1.common 包

object Constants {

  // --------------------------------------------- develop.properties key ------------------------------------------
  val ZOOKEEPER_CLIENT_PORT: String = "hbase.zookeeper.property.clientPort"
  val ZOOKEEPER_QUORUM: String = "hbase.zookeeper.quorum"
  val HBASE_MASTER: String = "hbase.master"
  val ZOOKEEPER_ZNODE_PARENT: String = "zookeeper.znode.parent"
  val HIVE_URL: String = "hive.url"
  val HIVE_DRIVER_CLASS: String = "hive.driver.class"
  val HIVE_USER: String = "hive.user"
  val HIVE_PASSWORD: String = "hive.password"
  val HIVE_DATA_INPUT_PATH: String = "hive.data.input.path"

  // --------------------------------------------- constant key ------------------------------------------
  val DESC_TABLE: String = "DESC FORMATTED ";
  val SELECT: String = "select "
  val FROM: String = " from ";
  val WHERE: String = " where ";
  val PARTITION_PREFIX: String = "dt = "
  val HBASE_COLUMN_FAMILY: String = "info"
  val FIELDS_TERMINATED: Char = '\001'
  val COL_NAME: String = "col_name"
  val APP_NAME_PREFIX = "hive2hbase-";
  val SHARP: String = "#"
  val POINT = "."
}

2. config 包

object ConfigureContext {

  // 解析檔案返回map
  def loadConfig(): Map[String, String] ={
    val bundle = ResourceBundle.getBundle("develop")
    var configMap:Map[String, String] = Map()
    val enum = bundle.getKeys
    while(enum.hasMoreElements){
      val key = enum.nextElement()
      configMap +=((key, bundle.getString(key)))
    }
    configMap
  }

  def main(args: Array[String]): Unit = {
      var map = loadConfig()
      map.keys.foreach{
        i => println("key = " + i, "value = " + map(i))
     }
  }
}

3.datasync 包

import java.sql.{Connection, DriverManager, ResultSet}
import java.text.SimpleDateFormat
import java.util
import java.util.Calendar

import com.sqyc.bigdata.common.Constants
import com.sqyc.bigdata.config.ConfigureContext
import com.sqyc.bigdata.utils.MD5Utils
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, HBaseAdmin, Put}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
import scala.util.control._

object Hive2HBase {

  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      throw new IllegalArgumentException("Please input parameters.【hiveDatabase hiveTable hiveIgnoreFields [hivePartition]】")
    }

    val formatter = new SimpleDateFormat("yyyy-MM-dd")
    val date = Calendar.getInstance()
    val configMap: Map[String, String] = ConfigureContext.loadConfig()

    val hiveDatabase: String = args(0)
    val hiveTable: String = args(1)
    val hiveIgnoreFields: String = args(2)
    var hivePartition: String = null
    if (args.length == 4) {
      hivePartition = args(3)
    } else {
      date.add(Calendar.DAY_OF_MONTH, -1)
      hivePartition = formatter.format(date.getTime)
    }
    
    // 建立hbase表
    createTable(hiveTable, configMap)

    // 獲取hive中列
    val columnList = getColumnList(hiveDatabase, hiveTable, hiveIgnoreFields, configMap)

    // 拼接sql
    val columns: String = getSelectColumns(columnList)
    val sql = getSql(hiveDatabase, hiveTable, hivePartition, columns)

    // 查詢資料
    val spark = SparkSession.builder().appName(Constants.APP_NAME_PREFIX + hiveTable).enableHiveSupport().getOrCreate()
    val dataDF = spark.sql(sql).toDF()

    // 寫入資料到hbase
    dataDF.foreachPartition(it => {
      val conn = getHBaseConnection(configMap)
      val admin = conn.getAdmin.asInstanceOf[HBaseAdmin]
      val tableName = TableName.valueOf(hiveTable)
      val table = conn.getTable(tableName);
      it.foreach( row => {
        def checkValue(v: Any): String = if (v == null || v.toString.trim.eq("")) "null" else v.toString

        val rowkey = MD5Utils.string2MD5(row(0).toString).getBytes()
        val columnFamily = Constants.HBASE_COLUMN_FAMILY.getBytes()
        val put = new Put(rowkey)
        for (i <- 0 until columnList.size) {
          put.addColumn(columnFamily, columnList(i).getBytes, checkValue(row(i)).getBytes())
        }
        table.put(put)
      })
      conn.close()
    })
  }

  def getColumnList(hiveDatabase: String, hiveTable: String, hiveIgnoreFields: String, configMap: Map[String, String]) = {
    Class.forName(configMap(Constants.HIVE_DRIVER_CLASS))
    val conn = DriverManager.getConnection(configMap(Constants.HIVE_URL) + hiveDatabase, configMap(Constants.HIVE_USER), configMap(Constants.HIVE_PASSWORD))
    var columnList = new ListBuffer[String]
    val ps = conn.prepareStatement(Constants.DESC_TABLE + hiveDatabase + Constants.POINT + hiveTable)
    val rs = ps.executeQuery

    val breakWhile = new Breaks;
    val continueWhile = new Breaks;
    val ignoreList = hiveIgnoreFields.split(",").toList
    while (rs.next) {
      if (startColumnsDescRow(rs)) {
        breakWhile.breakable {
          while (rs.next()) {
            continueWhile.breakable {
              val colName = rs.getString(Constants.COL_NAME)
              if (colName == null || colName.trim().equals("") || ignoreList.contains(colName)) {
                continueWhile.break()
              } else if (colName.startsWith(Constants.SHARP)) {
                breakWhile.break()
              } else {
                columnList.append(colName)
              }
            }
          }
        }
      }
    }
    if (conn != null) conn.close()
    columnList
  }

  def startColumnsDescRow(rs: ResultSet) = {
    val colName = rs.getString(Constants.COL_NAME)
    colName.trim == "# col_name"
  }

  def getSelectColumns(columnList: ListBuffer[String]) = {
    var columns = new StringBuilder()
    for (column <- columnList) {
      columns.append(column)
      columns.append(",")
    }
    columns.deleteCharAt(columns.length - 1).toString()
  }

  def getSql(hiveDatabase: String, hiveTable: String, hivePartition: String, columns: String) = {
    var sql = new StringBuilder()
      .append(Constants.SELECT)
      .append(columns)
      .append(Constants.FROM)
      .append(hiveDatabase)
      .append(Constants.POINT)
      .append(hiveTable)
      .append(Constants.WHERE)
      .append(Constants.PARTITION_PREFIX + "'" + hivePartition + "'")
    sql.toString()
  }

  def getHBaseConnection(configMap: Map[String, String]) = {
    val conf = HBaseConfiguration.create
    conf.set("hbase.zookeeper.property.clientPort", configMap(Constants.ZOOKEEPER_CLIENT_PORT))
    conf.set("hbase.zookeeper.quorum", configMap(Constants.ZOOKEEPER_QUORUM))
    conf.set("hbase.master", configMap(Constants.HBASE_MASTER))
    conf.set("zookeeper.znode.parent", configMap(Constants.ZOOKEEPER_ZNODE_PARENT))
    ConnectionFactory.createConnection(conf)
  }

  def createTable(hiveTable: String, configMap: Map[String, String]): Unit = {
    val conn = getHBaseConnection(configMap)
    val admin = conn.getAdmin.asInstanceOf[HBaseAdmin]
    val tableName = TableName.valueOf(hiveTable)
    if (!admin.tableExists(tableName)) {
      // 表不存在則建立
      val desc = new HTableDescriptor(tableName)
      val columnDesc = new HColumnDescriptor(Constants.HBASE_COLUMN_FAMILY)
      desc.addFamily(columnDesc)
      admin.createTable(desc)
    }
  }
}

4. pom.xml

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
	<spak.version>2.1.2</spak.version>
	<hadoop.version>2.7.3</hadoop.version>
	<hive.version>1.2.0</hive.version>
	<scala.version>2.11.8</scala.version>
	<hbase.version>1.2.6</hbase.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>${spak.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-sql_2.11</artifactId>
		<version>${spak.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-hive_2.11</artifactId>
		<version>${spak.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hadoop</groupId>
		<artifactId>hadoop-client</artifactId>
		<version>${hadoop.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-jdbc</artifactId>
		<version>${hive.version}</version>
	</dependency>
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-library</artifactId>
		<version>${scala.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-server</artifactId>
		<version>${hbase.version}</version>
	</dependency>
</dependencies>

<profiles>
	<profile>
		<id>dev</id>
		<activation>
			<activeByDefault>true</activeByDefault>
		</activation>
		<build>
			<filters>
				<filter>src/main/filters/dev.properties</filter>
			</filters>
			<resources>
				<resource>
					<directory>src/main/resources</directory>
				</resource>
			</resources>
		</build>
	</profile>
	<profile>
		<id>online</id>
		<build>
			<filters>
				<filter>src/main/filters/online.properties</filter>
			</filters>
			<resources>
				<resource>
					<directory>src/main/resources</directory>
				</resource>
			</resources>
		</build>
	</profile>
	<profile>
		<id>test</id>
		<build>
			<filters>
				<filter>src/main/filters/test.properties</filter>
			</filters>
			<resources>
				<resource>
					<directory>src/main/resources</directory>
				</resource>
			</resources>
		</build>
	</profile>
</profiles>

<build>
	<sourceDirectory>src/main/scala</sourceDirectory>
	<!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
	<resources>
		<!-- 保證resources下的所有的properties配置檔案可以被過濾-->
		<resource>
			<directory>src/main/resources</directory>
			<includes>
				<include>**/*.properties</include>
			</includes>
			<filtering>true</filtering>
		</resource>
	</resources>
	<plugins>
		<plugin>
			<groupId>org.scala-tools</groupId>
			<artifactId>maven-scala-plugin</artifactId>
			<version>2.15.2</version>
			<executions>
				<execution>
					<goals>
						<goal>compile</goal>
						<goal>testCompile</goal>
					</goals>
					<configuration>
						<args>
							<arg>-dependencyfile</arg>
							<arg>${project.build.directory}/.scala_dependencies</arg>
						</args>
					</configuration>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-surefire-plugin</artifactId>
			<version>2.10</version>
			<configuration>
				<useFile>false</useFile>
				<disableXmlReport>true</disableXmlReport>
				<!-- If you have classpath issue like NoDefClassError,... -->
				<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
				<includes>
					<include>**/*Test.*</include>
					<include>**/*Suite.*</include>
				</includes>
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-assembly-plugin</artifactId>
			<version>2.4</version>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
				<archive>
					<manifest>
						<mainClass>datasync.Hive2HBase</mainClass>
					</manifest>
				</archive>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>
<reporting>
	<plugins>
		<plugin>
			<groupId>org.scala-tools</groupId>
			<artifactId>maven-scala-plugin</artifactId>
			<configuration>
				<scalaVersion>${scala.version}</scalaVersion>
			</configuration>
		</plugin>
	</plugins>
</reporting>

4. 打成jar 提交spark 執行

執行如下命令

spark-submit --master yarn-client --num-executors 2 --executor-memory 4g --driver-memory 4g --executor-cores 3 --class datasync.Hive2HBase xxx.jar dbname tbname partitionname

如果spark on yarn 則執行如下命令

spark-submit --master spark://ip:port --num-executors 2 --executor-memory 2g --driver-memory 2g --executor-cores 10 --total-executor-cores 20