15 友盟專案--資原始檔工具類(ResourceUtil)、sql執行工具類(ExecSQLUtil)
阿新 • • 發佈:2018-11-05
資原始檔工具類把sql指令碼轉換為String字串--->交給sql工具類ExecSQLUtil執行sql
1.資原始檔工具類(ResourceUtil)
把sql指令碼轉換為String字串
/** * 資原始檔工具類 */ public class ResourceUtil { /** * 以String方式讀取整個資源串 */ public static String readResourceAsString(String resource ,String charset) throws Exception { InputStream inputResourceUtil -資原始檔工具類= Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos = new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); }return new String(baos.toByteArray() , charset) ; } /** * 以String方式讀取整個資源串 */ public static String readResourceAsString(String resource) throws Exception { InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource) ; ByteArrayOutputStream baos= new ByteArrayOutputStream() ; byte[] buf = new byte[1024] ; int len = -1 ; while((len = input.read(buf)) != -1){ baos.write(buf , 0 , len); } String sql = new String(baos.toByteArray(), Charset.defaultCharset()) ; //替換掉註釋 sql = sql.replaceAll("--.*\r\n", "") ; return sql ; } /** * 將資原始檔讀取出來,形成list */ public static List<String> readResourceAsList(String resource) throws Exception { List<String> list = new ArrayList<String>() ; InputStream input = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource); BufferedReader br = new BufferedReader(new InputStreamReader(input)) ; String line = null ; while((line = br.readLine()) != null){ if(!line.trim().equals("")){ list.add(line) ; } } return list ; } }
2.sql執行工具類(ExecSQLUtil)
執行sql
package com.oldboy.umeng.spark.stat; import com.oldboy.umeng.common.util.ResourceUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * 執行指令碼工具類 */ public class ExecSQLUtil { /** * 執行sql指令碼 */ public static void execSQLScript(SparkSession sess, String sqlScript) throws Exception { //資源工具類 把sql指令碼轉化為String String sqls = ResourceUtil.readResourceAsString(sqlScript); String arr[] = sqls.split(";"); for (String sql : arr) { if (!sql.trim().equals("")) { sess.sql(sql).show(1000, false); } } } /** * 執行sqlsStr */ public static void execSQLString(SparkSession sess, String sqlString) throws Exception { String arr[] = sqlString.split(";"); for (String sql : arr) { if (!sql.trim().equals("")) { sess.sql(sql).show(1000, false); } } } /** * 執行sqlsStr */ public static Dataset<Row> execSQLString2(SparkSession sess, String sqlString) throws Exception { String arr[] = sqlString.split(";"); for (int i = 0 ; i< arr.length ; i ++) { if (!arr[i].trim().equals("")) { if(i != arr.length - 1){ sess.sql(arr[i]).show(); ; } else{ return sess.sql(arr[i]) ; } } } return null ; } /** * 註冊函式 */ public static void execRegisterFuncs(SparkSession sess) throws Exception { execSQLScript(sess, "funcs.sql"); } }ExecSQLUtil 執行sql工具類
3.例如 清洗轉儲資料
/** * 清洗資料 */ public class DataCleanJava { public static void main(String[] args) throws Exception { String log_sql_script_name = "data_clean_startup.sql" ; if(args != null && args.length > 0){ log_sql_script_name = args[0] ; } SparkConf conf = new SparkConf(); conf.setAppName("dataClean") ; conf.setMaster("local[4]") ; SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport( ).getOrCreate(); //先註冊函式 ExecSQLUtil.execRegisterFuncs(sess); //執行sql ExecSQLUtil.execSQLScript(sess , "data_clean_error.sql"); } }
SQL指令碼
funcs.sql註冊函式指令碼
use big12_umeng ; drop function if exists forkstartuplogs ; drop function if exists forkeventlogs ; drop function if exists forkerrorlogs ; drop function if exists forkpagelogs ; drop function if exists forkusagelogs ; drop function if exists formatbyday ; drop function if exists formatbyweek ; drop function if exists formatbymonth ; create TEMPORARY function forkstartuplogs as 'com.oldboy.umeng.hive.udtf.ForkStartuplogsUDTF' ; create TEMPORARY function forkeventlogs as 'com.oldboy.umeng.hive.udtf.ForkEventlogsUDTF' ; create TEMPORARY function forkerrorlogs as 'com.oldboy.umeng.hive.udtf.ForkErrorlogsUDTF' ; create TEMPORARY function forkpagelogs as 'com.oldboy.umeng.hive.udtf.ForkPagelogsUDTF' ; create TEMPORARY function forkusagelogs as 'com.oldboy.umeng.hive.udtf.ForkUsagelogsUDTF' ; create TEMPORARY function formatbyday as 'com.oldboy.umeng.hive.udf.FormatByDayUDF' ; create TEMPORARY function formatbyweek as 'com.oldboy.umeng.hive.udf.FormatByWeekUDF' ; create TEMPORARY function formatbymonth as 'com.oldboy.umeng.hive.udf.FormatByMonthUDF' ;