簡單點,搬磚的方式簡單點,hive mysql 匯入資料的UDF,分享給小夥伴們
套用一下薛之謙的<演員>,來一首<程式設計師>:簡單點,搬磚的方式簡單點。
我們使用hive一般是執行離線統計分析,然後將執行的結果匯入到Mysql的表中供前端報表視覺化展現來查詢。
導回mysql的方式有許多,以前是用sqoop導回Mysql,還有人用hive jdbc查詢然後將結果拉回到程式碼層面,再在程式碼層面用mysql jdbc寫回Mysql資料庫。
但是這兩種方式都會有一個二次處理環節(雖然我們以前實現了SQL的解析可以把sqoop的操作對使用者來說透明化,比如insert into mysql.table1 select * from hive.table2這樣的sql會將hive查詢出來的結果插入mysql,但是實現起來複雜度比較高)。
這次介紹另外一種處理方式,直接將Mysql的操作整合在udf中,這樣直接寫一個hql查詢語句就可以了。
package brickhouse.udf.mysql; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.IntWritable; import javax.sql.DataSource; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Properties; @Description(name = "mysql_import", value = "_FUNC_(config_path, sql,args1,[args2,...]) - Return ret " ) public class MysqlImportUDF extends GenericUDF { private IntObjectInspector retValInspector; private DataSource dataSource; private String sql; private PrimitiveObjectInspector[] paramsInspectors; @Override public Object evaluate(DeferredObject[] arg0) throws HiveException { try (Connection connection = dataSource.getConnection(); PreparedStatement stmt = connection.prepareStatement(sql)) { System.out.println("execute sql:" + System.currentTimeMillis()); for (int i = 2; i < arg0.length; i++) { Object param = paramsInspectors[i - 2].getPrimitiveJavaObject(arg0[i].get()); stmt.setObject(i - 1, param); } int ret = stmt.executeUpdate(); IntWritable iw = new IntWritable(ret); return retValInspector.getPrimitiveWritableObject(iw); } catch (SQLException e) { e.printStackTrace(); throw new HiveException(e); } } @Override public void close() throws IOException { try { BasicDataSource bds = (BasicDataSource) dataSource; bds.close(); } catch (SQLException e) { e.printStackTrace(); throw new IOException(e); } } @Override public String getDisplayString(String[] arg0) { return "mysql_import(config_path, sql,args1[,args2,...argsN])"; } @Override public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { if (arg0.length < 3) { throw new UDFArgumentException(" Expecting at least three arguments "); } if (arg0[0].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[0] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("mysql connection pool config path must be constant"); } ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0]; String configPath = propertiesPath.getWritableConstantValue().toString(); Properties properties = new Properties(); Configuration conf = new Configuration(); Path path = new Path(configPath); try (FileSystem fs = FileSystem.get(path.toUri(), conf); InputStream in = fs.open(path)) { properties.load(in); this.dataSource = BasicDataSourceFactory.createDataSource(properties); } catch (FileNotFoundException ex) { throw new UDFArgumentException("在檔案系統中或者是HDFS上沒有找到對應的配置檔案"); } catch (Exception e) { e.printStackTrace(); throw new UDFArgumentException(e); } } if (arg0[1].getCategory() == Category.PRIMITIVE && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { if (!(arg0[1] instanceof ConstantObjectInspector)) { throw new UDFArgumentException("the second arg must be a sql string constant"); } ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1]; this.sql = sqlInsp.getWritableConstantValue().toString(); if (this.sql == null || this.sql.trim().length() == 0) { throw new UDFArgumentException("the second arg must be a sql string constant and not nullable"); } } paramsInspectors = new PrimitiveObjectInspector[arg0.length - 2]; for (int i = 2; i < arg0.length; i++) { paramsInspectors[i - 2] = (PrimitiveObjectInspector) arg0[i]; } retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector; return retValInspector; } }
上傳jar包,註冊udf:
CREATE FUNCTION default.mysql_import4 AS 'brickhouse.udf.mysql.MysqlImportUDF' USING JAR 'hdfs://name84:8020/tmp/jar/brickhouse-0.7.1.jar';
然後寫一個HQL測試一下:
select default.mysql_import4('hdfs://name84:8020/user/hive/udf/conf/mysql.properties','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',b.ds,b.type,b.pv,b.uv,b.pv,b.uv) from (
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) b
內層子查詢是一個聚合查詢,業務邏輯是計算每天每個type的pv,uv,然後外層包一層,用上面註冊的udf,將計算結果插入mysql。
UDF第一個引數是靜態引數,是一個配置檔案路徑,裡面配置瞭如何開啟連線池連線哪個資料庫什麼的。
第二個引數是一個mysql的sql語句,描述入庫方式,然後後面的引數就不固定了,一一對應mysql語句中的佔位符,比如我上面有6個佔位符,然後我後面就跟了6個引數。
附一個mysql.properties配置檔案的內容:
driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://192.168.78.26:3306/db_stat?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&connectTimeout=60000&socketTimeout=60000
username=xyzs
password=xxxxxx
initialSize=1
maxActive=20
minIdle=5
maxIdle=15
connectionTimeoutMillis=5000
maxWait=60000
validationQuery=select 1 from dual
validationQueryTimeout=1
removeAbandoned=true
removeAbandonedTimeout=180
timeBetweenEvictionRunsMillis=30000
numTestsPerEvictionRun=20
testWhileIdle=true
testOnBorrow=false
testOnReturn=false
TODO:目前這個udf是每條結果單獨執行一個sql插入,準備寫一個batch插入的,將查詢結果先collect_list變成一個數組,然後一次批量插入資料庫。
再附上一個批量插入的udf:
package brickhouse.udf.mysql;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
@Description(name = "mysql_batch_import",
value = "_FUNC_(config_path, sql,array<struct>) - Return ret "
)
public class MysqlBatchImportUDF extends GenericUDF {
public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/mysqludf/";
public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";
private StandardListObjectInspector retValInspector;
private Properties properties;
private String sql;
private StandardListObjectInspector paramsListInspector;
private StandardStructObjectInspector paramsElementInspector;
@Override
public Object evaluate(DeferredObject[] arg0) throws HiveException {
//batch import由於是一次性插入,所以不建立連線池了,直接建立一個連線執行
try (Connection connection = DriverManager.getConnection(properties.getProperty("url"), properties.getProperty("username"), properties.getProperty("password"));
PreparedStatement stmt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
Object row = paramsListInspector.getListElement(arg0[2].get(), i);
for (int j = 0; j < paramsElementInspector.getAllStructFieldRefs().size(); j++) {
StructField structField = paramsElementInspector.getAllStructFieldRefs().get(j);
Object col = paramsElementInspector.getStructFieldData(row, structField);
Object param = ((PrimitiveObjectInspector) structField.getFieldObjectInspector()).getPrimitiveJavaObject(col);
stmt.setObject(j + 1, param);
}
stmt.addBatch();
}
int[] ret = stmt.executeBatch();
connection.commit();
Object returnlist = retValInspector.create(ret.length);
for (int i = 0; i < ret.length; i++) {
retValInspector.set(returnlist, i, ret[i]);
}
return returnlist;
} catch (SQLException e) {
e.printStackTrace();
throw new HiveException(e);
}
}
@Override
public String getDisplayString(String[] arg0) {
return "mysql_batch_import(config_path, sql,array<struct>)";
}
@Override
public ObjectInspector initialize(ObjectInspector[] arg0)
throws UDFArgumentException {
if (arg0.length != 3) {
throw new UDFArgumentException(" Expecting three arguments ");
}
//第一個引數校驗
if (arg0[0].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[0] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("mysql connection pool config path must be constant");
}
ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[0];
String fileName1 = propertiesPath.getWritableConstantValue().toString();
Path path1 = new Path(fileName1);
if (path1.toUri().getScheme() == null) {
if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {
throw new UDFArgumentException("不支援的副檔名,目前只支援properties檔案!");
}
//如果是相對路徑,補齊根路徑
if (!fileName1.startsWith("/")) {
fileName1 = MysqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;
}
}
//如果只寫了檔案字首的話,補上字尾
if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {
fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;
}
Properties properties = new Properties();
Configuration conf = new Configuration();
Path path2 = new Path(fileName1);
try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //這裡不能用FileSystem.get(path2.toUri(), conf),必須得重新newInstance,get出來的是共享的連線,這邊關閉的話,會導致後面執行完之後可能出現FileSystem is closed的異常
InputStream in = fs.open(path2)) {
properties.load(in);
this.properties = properties;
} catch (FileNotFoundException ex) {
throw new UDFArgumentException("在檔案系統中或者是HDFS上沒有找到對應的配置檔案");
} catch (Exception e) {
e.printStackTrace();
throw new UDFArgumentException(e);
}
}
//第二個引數校驗,必須是一個非空的sql語句
if (arg0[1].getCategory() == Category.PRIMITIVE
&& ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
if (!(arg0[1] instanceof ConstantObjectInspector)) {
throw new UDFArgumentException("the second arg must be a sql string constant");
}
ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[1];
this.sql = sqlInsp.getWritableConstantValue().toString();
if (this.sql == null || this.sql.trim().length() == 0) {
throw new UDFArgumentException("the second arg must be a sql string constant and not nullable");
}
}
//第三個引數校驗
if (arg0[2].getCategory() != Category.LIST) {
throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
}
ListObjectInspector third = (ListObjectInspector) arg0[2];
if (third.getListElementObjectInspector().getCategory() != Category.STRUCT) {
throw new UDFArgumentException(" Expecting an array<struct> field as third argument ");
}
paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
paramsElementInspector = (StandardStructObjectInspector) third.getListElementObjectInspector();
retValInspector = ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
return retValInspector;
}
}
前面兩個引數和上面的mysql_import_ext一樣.
引數3:傳遞一個array<struct>型別的欄位,array中的元素必須是一個struct型別,並且struct中的field個數必須與sql佔位符一致
示例demo將查詢結果集再進行一次collect_list操作獲得一個array欄位作為udf的第三個引數:
select default.mysql_batch_import('mysql_78_26','insert into xj_test1(ds,`mod`,pv,uv) values(?,?,?,?) on duplicate key update pv=pv+?,uv=uv+?',collect_list(struct(ds,type,pv,uv,pv,uv))) from
(
select ds,type,count(did) as pv,count(distinct did) as uv
from dd_xyzs_pc_action_detail
where ds='2016-10-23'
group by ds,type
) a