spark sql: 操作hive表
阿新 • • 發佈:2018-12-20
目標: 實現類似於navicat的功能=> 寫hql語句,在idea下使用spark sql 一鍵執行,而不用到shell視窗下執行命令
步驟: 寫sql檔案 (resources目錄)—> 讀取內容 --> 以 ‘;’ 解析每條命令 -->sparksql 執行命令
1, 寫hql檔案
gen_startup.sql 內容如下:
use db ; set hive.exec.dynamic.partition.mode=nonstrict ; --定義函式 drop function if exists forkstartuplogs ; drop function if exists formatbyday ; create function forkstartuplogs as '包名.ForkStartuplogsUDTF' ; create function formatbyday as '包名.FormatByDayUDF' ; --使用函式 insert into appstartuplogs partition(ym , day) select t.appChannel , t.appId , t.appPlatform , t.appVersion , t.brand , formatbyday(t.createdatms , 0 , 'yyyyMM') , formatbyday(t.createdatms , 0 , 'dd') from ( select forkstartuplogs(servertimestr ,clienttimems ,clientip ,json) from raw_logs where ym='201511' and day='8' )t;
2,配置maven依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency>
2,建立SparkSession
import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; public static String readSQLAsString(String resource) throws Exception { InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos = new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); } String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ; //替換掉註釋 sql = sql.replaceAll("--.*\r\n", "") ; return sql ; } public static void main(String[] args) throws Exception { //spark配置 SparkConf conf = new SparkConf(); conf.setAppName("dataClean") ; conf.setMaster("local[4]") ; //spark sql會話 SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate(); String sqls = ResourceUtil.readSQLAsString(sqlScript); String arr[] = sqls.split(";"); for (String sql : arr) { if (!sql.trim().equals("")) { sess.sql(sql).show(10, false); } } }