spark讀hdfs(hive表)處理資料結果落hive表Demo
阿新 • • 發佈:2019-01-11
一、查詢引擎測試壓測demo實現邏輯
很久沒寫spark工程了,近期需要一個查詢引擎測試壓測工具,以hive(HDFS)中每日落盤的查詢來壓測引擎效能,正適合用spark讀hdfs,結果落hive。小結個小demo吧
(1) 實現邏輯
- spark讀取HDFS中儲存的隨機某天(以引數形式傳入)的查詢(
hive_test.engine_queryjson
表的第二列即為查詢) - 以2秒為間隔向引擎提交查詢
- 每隔2秒輪詢查詢結果,5分鐘查詢未完畢視為查詢超時
- 將查詢執行狀態寫入狀態結果表(
hive_test.query_result_info
)
(2)提交程式碼至spark執行.
- 打包
- spark-submit提交jar包
spark-submit參考spark官文:Submitting Applications
這裡mian函式設有一個日期引數,格式”20190106“,標識hive_test.engine_queryjson
某天粒度分割槽.
備註:cluster mode讀取hive 需通過–files 指定hive-site, 如果叢集已配置可忽略
spark-submit \ --master yarn \ --deploy-mode cluster \ --files hdfs://xxxxx/hive-site.xml\ 指定hive-site,叢集已配置可忽略 --queue 這裡佇列 \ --class com.learn.QuerySender \ ./query_sender-1.0-SNAPSHOT-jar-with-dependencies.jar \ 20190106
(3) 結果落Hive表用於分析
執行完畢後,每個查詢的執行狀態會寫入hive_test.query_result_info
,查詢該表做相關統計或分析即可
二、主要程式碼說明.
(1) hive_test.query_result_info
建表語句.
CREATE EXTERNAL TABLE `hive_test.query_result_info `( `query ` string COMMENT 'query', `code` int COMMENT 'result code', `info` string COMMENT 'query info') COMMENT 'query result table' PARTITIONED BY ( `dt` string COMMENT 'dt')
(2) 主要程式碼片段
package com.learn
import com.alibaba.fastjson.JSON
import com.learn.util.HttpUtil
import org.slf4j.LoggerFactory
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* query result
* @param queryJson query
* @param code result code
* @param info query info
*/
case class QueryResult (
query: String, // query json
code: Int, // result code
info: String // query info
)
object QuerySender {
val queryUrl = "xxx"
val queryResultUrl = "XXX"
val queryTimeout = 300000 // 5minute timeout
val queryPolingInterval = 2000 // 2秒中輪詢超時結果
val queryInterval = 2000 // 2秒查詢間隔
def main(args: Array[String]) {
val logger = LoggerFactory.getLogger(QuerySender.getClass)
val dt = args.apply(0)
println("dt=" + dt)
//val hadoopUserName = args.apply(1)
//val hadoopUserPassWord = args.apply(2)
//println("hadoopUserName=" + hadoopUserName)
//println("hadoopUserPassWord=" + "hadoopUserPassWord")
//System.setProperty("HADOOP_USER_NAME", hadoopUserName)
//System.setProperty("HADOOP_USER_PASSWORD", hadoopUserPassWord)
val conf = new SparkConf()
conf.setAppName("HdfsReader")
//conf.setMaster("local") // 本地測試
// ---讀取Hdfs-這裡示範讀取hdfs,亦可以改為讀hive表(具體程式碼這裡不貼了)--
val sc = SparkContext.getOrCreate(conf)
var hdfsFile = sc.textFile(s"hdfs://xxxx/engine_queryjson/dt=$dt/000000_0")
var hivedata = hdfsFile.map(_.split("\t")).map(e => (e(1), e(2),e(0)))
println(hivedata.first()._2)// 第二列為query字串
// ---提交查詢---
var queryResults = scala.collection.mutable.ArrayBuffer[QueryResult]()
println("提交查詢.....")
hivedata.foreach(v => {
queryResults.append(submmitQuery(v._2))
Thread.sleep(queryInterval) // 2秒查詢間隔
})
// ---查詢結果寫Hive---
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
queryResults.toDF("query", "code", "info").registerTempTable("queryResultTempTable")
println("start insert overwrite table....")
sqlContext.sql("set hive.exec.dynamic.partition=true")
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
sqlContext.sql("insert overwrite table hive_test.query_result_info partition(dt) " +
s"select query, code, info, $dt " +
"from queryResultTempTable ")
sc.stop()
}
/**
* 提交查詢
* 提交查詢,這裡假設返回資訊meta中有code和status標識查詢狀態
* code:2超時,1失敗, 0成功
* HttpUtil為以java實現的http工具包(本工程為java、scala混合程式設計,此不詳述,見pom)
* 每隔2秒輪詢查詢結果,查詢超時時間5分鐘
* @param query. 查詢
* @return 查詢結果
*/
def submmitQuery(query: String): QueryResult = {
val startTime = System.currentTimeMillis()
val result = scala.collection.mutable.Map[String, String]()
val responseDirect = HttpUtil.postJson(queryUrl, query) //提交查詢,responseDirect為返回狀態
println("查詢狀態:" + responseDirect.toString)
// 解析狀態
val jsonResponse = JSON.parseObject(responseDirect)
val code = jsonResponse.getJSONObject("meta").getIntValue("code")
val satus = jsonResponse.getJSONObject("meta").getString("satus")
val msg = jsonResponse.getJSONObject("meta").getString("msg")
if (code == 1 || code == 0) {// 2超時,1失敗, 0成功
// 查詢成功、失敗
return QueryResult(query, code, msg)
} else {
while(true) {
if (System.currentTimeMillis() - startTime >= queryTimeout) {
// 5分鐘超時
return QueryResult(query, 2, "timeout")
} else {
val responseRetry = HttpUtil.postJson(queryResultUrl, query) //超時,開始輪詢查詢結果
val code = jsonResponse.getJSONObject("meta").getIntValue("code")
val satus = jsonResponse.getJSONObject("meta").getString("satus")
val msg = jsonResponse.getJSONObject("meta").getString("msg")
if (code == 1 || code == 0) {
// 查詢成功、失敗
return QueryResult(query, code, msg)
}
Thread.sleep(queryPolingInterval) // 2秒輪詢結果
}
}
return QueryResult(queryJson, 2, "timeout")
}
}
}
(3) pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learn</groupId>
<artifactId>query_sender</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>query_sender</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.6</scala.version>
<fastjson.version>1.2.29</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--這裡不新增-take-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
三、參考
- spark官文-Submitting Applications