1. 程式人生 > >spark sql: 操作hive表

spark sql: 操作hive表

目標: 實現類似於navicat的功能=> 寫hql語句,在idea下使用spark sql 一鍵執行,而不用到shell視窗下執行命令

步驟: 寫sql檔案 (resources目錄)—> 讀取內容 --> 以 ‘;’ 解析每條命令 -->sparksql 執行命令

1, 寫hql檔案

gen_startup.sql 內容如下:

use db ;
set hive.exec.dynamic.partition.mode=nonstrict ;

--定義函式
drop function if exists forkstartuplogs ;
drop function if exists  formatbyday ;
create   function forkstartuplogs as '包名.ForkStartuplogsUDTF' ;
create   function formatbyday 	  as '包名.FormatByDayUDF' ;

--使用函式
insert into appstartuplogs partition(ym , day)
select
  t.appChannel ,
  t.appId ,
  t.appPlatform ,
  t.appVersion ,
  t.brand ,
  formatbyday(t.createdatms , 0 , 'yyyyMM') ,
  formatbyday(t.createdatms , 0 , 'dd')
from
 (
    select forkstartuplogs(servertimestr ,clienttimems ,clientip ,json) from  raw_logs where ym='201511' and day='8'
  )t;

2,配置maven依賴

	    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.17</version>
        </dependency>

2,建立SparkSession

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

public static String readSQLAsString(String resource) throws Exception {
		InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ;
		ByteArrayOutputStream baos = new ByteArrayOutputStream() ;

		byte[] buf = new byte[1024] ;
		int len = -1 ;
		while((len = input.read(buf)) != -1){
			baos.write(buf , 0 , len);
		}
		String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ;
		//替換掉註釋
		sql = sql.replaceAll("--.*\r\n", "") ;
		return sql ;
}
	
public static void main(String[] args) throws Exception {
		//spark配置
		SparkConf conf = new SparkConf();
		conf.setAppName("dataClean") ;
		conf.setMaster("local[4]") ;
		
		//spark sql會話
		SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
		String sqls = ResourceUtil.readSQLAsString(sqlScript);
		String arr[] = sqls.split(";");
		for (String sql : arr) {
			if (!sql.trim().equals("")) {
				sess.sql(sql).show(10, false);
			}
		}		
}