Spark Hive 匯入資料到 HBase
阿新 • • 發佈:2019-01-22
導讀:
如何使用MapReduce將textfile、rcfile格式的Hive表資料匯入到HBase,上一篇部落格中已經做了介紹 MapReduce Hive 匯入資料到 HBase,MR這種方式有兩個缺點,一是當資料量特別大的時候,執行較慢,hive中表的儲存格式有多種,除了上面兩種,還有常用的兩種就是ORC和Parquet,這兩種壓縮比和查詢效率都很高,當hive表中儲存格式發生變化後,可能就得需要修改程式碼重新打包。可以使用Spark來完成hive表資料匯入HBase,執行速度快且不受hive表儲存格式影響,一石兩鳥,何樂而不為,說幹就幹,筆者寫了一個Spark 程式如下。
工程構建筆者就不再介紹了,和上篇部落格中一樣,不同的是再建立完Maven專案後需要引入Scala,專案右鍵選擇Add Framework Support 然後彈出頁面中勾選Scala即可。
程式碼如下
1.common 包
object Constants { // --------------------------------------------- develop.properties key ------------------------------------------ val ZOOKEEPER_CLIENT_PORT: String = "hbase.zookeeper.property.clientPort" val ZOOKEEPER_QUORUM: String = "hbase.zookeeper.quorum" val HBASE_MASTER: String = "hbase.master" val ZOOKEEPER_ZNODE_PARENT: String = "zookeeper.znode.parent" val HIVE_URL: String = "hive.url" val HIVE_DRIVER_CLASS: String = "hive.driver.class" val HIVE_USER: String = "hive.user" val HIVE_PASSWORD: String = "hive.password" val HIVE_DATA_INPUT_PATH: String = "hive.data.input.path" // --------------------------------------------- constant key ------------------------------------------ val DESC_TABLE: String = "DESC FORMATTED "; val SELECT: String = "select " val FROM: String = " from "; val WHERE: String = " where "; val PARTITION_PREFIX: String = "dt = " val HBASE_COLUMN_FAMILY: String = "info" val FIELDS_TERMINATED: Char = '\001' val COL_NAME: String = "col_name" val APP_NAME_PREFIX = "hive2hbase-"; val SHARP: String = "#" val POINT = "." }
2. config 包
object ConfigureContext { // 解析檔案返回map def loadConfig(): Map[String, String] ={ val bundle = ResourceBundle.getBundle("develop") var configMap:Map[String, String] = Map() val enum = bundle.getKeys while(enum.hasMoreElements){ val key = enum.nextElement() configMap +=((key, bundle.getString(key))) } configMap } def main(args: Array[String]): Unit = { var map = loadConfig() map.keys.foreach{ i => println("key = " + i, "value = " + map(i)) } } }
3.datasync 包
import java.sql.{Connection, DriverManager, ResultSet}
import java.text.SimpleDateFormat
import java.util
import java.util.Calendar
import com.sqyc.bigdata.common.Constants
import com.sqyc.bigdata.config.ConfigureContext
import com.sqyc.bigdata.utils.MD5Utils
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, HBaseAdmin, Put}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
import scala.util.control._
object Hive2HBase {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
throw new IllegalArgumentException("Please input parameters.【hiveDatabase hiveTable hiveIgnoreFields [hivePartition]】")
}
val formatter = new SimpleDateFormat("yyyy-MM-dd")
val date = Calendar.getInstance()
val configMap: Map[String, String] = ConfigureContext.loadConfig()
val hiveDatabase: String = args(0)
val hiveTable: String = args(1)
val hiveIgnoreFields: String = args(2)
var hivePartition: String = null
if (args.length == 4) {
hivePartition = args(3)
} else {
date.add(Calendar.DAY_OF_MONTH, -1)
hivePartition = formatter.format(date.getTime)
}
// 建立hbase表
createTable(hiveTable, configMap)
// 獲取hive中列
val columnList = getColumnList(hiveDatabase, hiveTable, hiveIgnoreFields, configMap)
// 拼接sql
val columns: String = getSelectColumns(columnList)
val sql = getSql(hiveDatabase, hiveTable, hivePartition, columns)
// 查詢資料
val spark = SparkSession.builder().appName(Constants.APP_NAME_PREFIX + hiveTable).enableHiveSupport().getOrCreate()
val dataDF = spark.sql(sql).toDF()
// 寫入資料到hbase
dataDF.foreachPartition(it => {
val conn = getHBaseConnection(configMap)
val admin = conn.getAdmin.asInstanceOf[HBaseAdmin]
val tableName = TableName.valueOf(hiveTable)
val table = conn.getTable(tableName);
it.foreach( row => {
def checkValue(v: Any): String = if (v == null || v.toString.trim.eq("")) "null" else v.toString
val rowkey = MD5Utils.string2MD5(row(0).toString).getBytes()
val columnFamily = Constants.HBASE_COLUMN_FAMILY.getBytes()
val put = new Put(rowkey)
for (i <- 0 until columnList.size) {
put.addColumn(columnFamily, columnList(i).getBytes, checkValue(row(i)).getBytes())
}
table.put(put)
})
conn.close()
})
}
def getColumnList(hiveDatabase: String, hiveTable: String, hiveIgnoreFields: String, configMap: Map[String, String]) = {
Class.forName(configMap(Constants.HIVE_DRIVER_CLASS))
val conn = DriverManager.getConnection(configMap(Constants.HIVE_URL) + hiveDatabase, configMap(Constants.HIVE_USER), configMap(Constants.HIVE_PASSWORD))
var columnList = new ListBuffer[String]
val ps = conn.prepareStatement(Constants.DESC_TABLE + hiveDatabase + Constants.POINT + hiveTable)
val rs = ps.executeQuery
val breakWhile = new Breaks;
val continueWhile = new Breaks;
val ignoreList = hiveIgnoreFields.split(",").toList
while (rs.next) {
if (startColumnsDescRow(rs)) {
breakWhile.breakable {
while (rs.next()) {
continueWhile.breakable {
val colName = rs.getString(Constants.COL_NAME)
if (colName == null || colName.trim().equals("") || ignoreList.contains(colName)) {
continueWhile.break()
} else if (colName.startsWith(Constants.SHARP)) {
breakWhile.break()
} else {
columnList.append(colName)
}
}
}
}
}
}
if (conn != null) conn.close()
columnList
}
def startColumnsDescRow(rs: ResultSet) = {
val colName = rs.getString(Constants.COL_NAME)
colName.trim == "# col_name"
}
def getSelectColumns(columnList: ListBuffer[String]) = {
var columns = new StringBuilder()
for (column <- columnList) {
columns.append(column)
columns.append(",")
}
columns.deleteCharAt(columns.length - 1).toString()
}
def getSql(hiveDatabase: String, hiveTable: String, hivePartition: String, columns: String) = {
var sql = new StringBuilder()
.append(Constants.SELECT)
.append(columns)
.append(Constants.FROM)
.append(hiveDatabase)
.append(Constants.POINT)
.append(hiveTable)
.append(Constants.WHERE)
.append(Constants.PARTITION_PREFIX + "'" + hivePartition + "'")
sql.toString()
}
def getHBaseConnection(configMap: Map[String, String]) = {
val conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.property.clientPort", configMap(Constants.ZOOKEEPER_CLIENT_PORT))
conf.set("hbase.zookeeper.quorum", configMap(Constants.ZOOKEEPER_QUORUM))
conf.set("hbase.master", configMap(Constants.HBASE_MASTER))
conf.set("zookeeper.znode.parent", configMap(Constants.ZOOKEEPER_ZNODE_PARENT))
ConnectionFactory.createConnection(conf)
}
def createTable(hiveTable: String, configMap: Map[String, String]): Unit = {
val conn = getHBaseConnection(configMap)
val admin = conn.getAdmin.asInstanceOf[HBaseAdmin]
val tableName = TableName.valueOf(hiveTable)
if (!admin.tableExists(tableName)) {
// 表不存在則建立
val desc = new HTableDescriptor(tableName)
val columnDesc = new HColumnDescriptor(Constants.HBASE_COLUMN_FAMILY)
desc.addFamily(columnDesc)
admin.createTable(desc)
}
}
}
4. pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spak.version>2.1.2</spak.version>
<hadoop.version>2.7.3</hadoop.version>
<hive.version>1.2.0</hive.version>
<scala.version>2.11.8</scala.version>
<hbase.version>1.2.6</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spak.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spak.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spak.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<filters>
<filter>src/main/filters/dev.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>online</id>
<build>
<filters>
<filter>src/main/filters/online.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
<profile>
<id>test</id>
<build>
<filters>
<filter>src/main/filters/test.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</profile>
</profiles>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<!--<testSourceDirectory>src/test/scala</testSourceDirectory>-->
<resources>
<!-- 保證resources下的所有的properties配置檔案可以被過濾-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>datasync.Hive2HBase</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
4. 打成jar 提交spark 執行
執行如下命令
spark-submit --master yarn-client --num-executors 2 --executor-memory 4g --driver-memory 4g --executor-cores 3 --class datasync.Hive2HBase xxx.jar dbname tbname partitionname
如果spark on yarn 則執行如下命令
spark-submit --master spark://ip:port --num-executors 2 --executor-memory 2g --driver-memory 2g --executor-cores 10 --total-executor-cores 20