Hive中的UDF詳解
hive作為一個sql查詢引擎,自帶了一些基本的函式,比如count
(計數),sum
(求和),有時候這些基本函式滿足不了我們的需求,這時候就要寫hive hdf(user defined funation)
,又叫使用者自定義函式。
UDF 建立與使用步驟
- 繼承
org.apache.hadoop.hive.ql.exec.UDF
類,實現evaluate方法; - 打
jar
包上傳到叢集,通過create temporary function
建立臨時函式,不加temporary
就建立了一個永久函式; - 通過select 語句使用;
例一
下面是一個判斷hive表字段是否包含’100’
這個子串的簡單udf
:
package com.js.dataclean.hive.udf.hm2
import org.apache.hadoop.hive.ql.exec.UDF;
public class IsContains100 extends UDF{
public String evaluate(String s){
if(s == null || s.length() == 0){
return "0";
}
return s.contains("100") ? "1" : "0";
}
}
使用maven將其打包,進入hive cli
add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
建立完臨時函式,即可使用這個函數了:
select isContains100('abc100def') from table limit 1;
1
例二
通過讀取mysql資料庫中的規則,為hive中的workflow返回對應的,型別:
type workflow
a 1
a 2
b 11
b 22
b 33
需求:我們希望,將hive的workflow欄位取值為,1,2的變為型別(type)a
b
,就是歸類的意思。
這個udf可以這麼實現:
package com.js.dataclean.hive.udf.hm2.workflow;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ Author: keguang
* @ Date: 2018/12/13 16:24
* @ version: v1.0.0
* @ description:
*/
public class GetWorkflow extends UDF{
private static final String host = "0.0.0.0";
private static final String port = "3306";
private static final String database = "root";
private static final String userName = "root";
private static final String password = "123456";
private static String url = "";
private static final String driver = "com.mysql.jdbc.Driver";
private static Connection conn = null;
private static Map<String, List<String>> workflowType = null;
static {
url = "jdbc:mysql://" + host + ":" + port + "/" + database;
try {
// Class.forName(driver);
conn = DriverManager.getConnection(url, userName, password);
workflowType = getWorkflowType(conn);
} catch (Exception e) {
e.printStackTrace();
}
}
private static Map<String, List<String>> getWorkflowType(Connection conn){
Map<String, List<String>> workflowType = new HashMap<>();
String sql = "select * from flash_player_workflow";
PreparedStatement ps = null;
try {
ps = conn.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
while (rs.next()){
String workflow = rs.getString("workflow");
String type = rs.getString("flag");
List<String> workflows = workflowType.get(type);
if(workflows == null){
workflows = new ArrayList<>();
}
workflows.add(workflow);
workflowType.put(type, workflows);
}
} catch (SQLException e) {
e.printStackTrace();
}finally {
// 關閉連結
if(conn != null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return workflowType;
}
public String evaluate(String s){
assert workflowType != null;
for(String type:workflowType.keySet()){
List<String> workflows = workflowType.get(type);
if(workflows.contains(s)){
return type;
}
}
return s;
}
}
打好jar
包,建立函式: workflow2type(省略),然後使用:
select workflow2type(workflow) from table;
a
a
b
b
b
這樣就把很多取值歸為幾個大類了。
檢視hive function的用法
查month 相關的函式
show functions like '*month*';
檢視 add_months 函式的用法
desc function add_months;
檢視 add_months 函式的詳細說明並舉例
desc function extended add_months;
hive 中的 UDAF
可以看出,udf就是一個輸入一個輸出,輸入一個性別,返回’男’或者’女’,如果我們想實現select date,count(1) from table
,統計每天的流量呢?這就是一個分組統計,顯然是多個輸入,一個輸出,這時候udf已經不能滿足我們的需要,就需要寫udaf,user defined aggregare function
(使用者自定義聚合函式)。
這裡寫一個字串連線函式,相當於concat
的功能,將多行輸入,合併為一個字串,當然了hive中有字串連線函式,這裡是舉例說明UDAF
的用法:
package com.js.dataclean.hive.udaf.hm2;
import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
/**
* 實現字串連線聚合的UDAF
* @version v1.0.0
* @Author:keguang
* @Date:2018/10/22 14:36
*/
public class MutiStringConcat extends UDAF{
public static class SumState{
private String sumStr;
}
public static class SumEvaluator implements UDAFEvaluator{
SumState sumState;
public SumEvaluator(){
super();
sumState = new SumState();
init();
}
@Override
public void init() {
sumState.sumStr = "";
}
/**
* 來了一行資料
* @param s
* @return
*/
public boolean iterate(String s){
if(!StringUtil.isNull(s)){
sumState.sumStr += s;
}
return true;
}
/**
* 狀態傳遞
* @return
*/
public SumState terminatePartial() {
return sumState;
}
/**
* 子任務合併
* @param state
* @return
*/
public boolean merge(SumState state){
if(state != null){
sumState.sumStr += state.sumStr;
}
return true;
}
/**
* 返回最終結果
* @return
*/
public String terminate(){
return sumState.sumStr;
}
}
}
用法,與udf一樣,還是需要打包並且到hive cli中註冊使用。
關於UDAF開發注意點:
- 需要
import org.apache.hadoop.hive.ql.exec.UDAF
以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator
,這兩個包都是必須的 - 函式類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator介面
- Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函式
- init函式類似於建構函式,用於UDAF的初始化
- iterate接收傳入的引數,並進行內部的輪轉。其返回型別為boolean
- terminatePartial無引數,其為iterate函式輪轉結束後,返回亂轉資料,iterate和terminatePartial類似於hadoop的Combiner
- merge接收terminatePartial的返回結果,進行資料merge操作,其返回型別為boolean
- terminate返回最終的聚集函式結果
臨時與永久函式
Hive
自定義函式分為臨時與永久函式,顧名思義,分別是臨時使用和永久有效使用的意思。
臨時函式
臨時函式,關閉會話就結束了生命週期,下次要想使用,需要重新註冊。
add jar /path/xx.jar(儲存在本地磁碟)
// 臨時註冊UDF函式(hive會話生效)
create temporary function 函式名 as '包名.類名';
刪除臨時函式:
- drop temporary function 資料庫名.函式名;
永久函式
永久函式一旦註冊,可以在hive cli,遠端連線hiveserver2等地方永久使用,步驟為:
-
先上傳jar包到HDFS
-
永久註冊:
CREATE FUNCTION 函式名 AS '包名.類名' USING JAR 'hdfs:///path/xxxx.jar';
注意:指定jar包路徑需要是hdfs
路徑。
- 刪除永久函式:
drop function 資料庫名.函式名字;
新增的永久函式,比如在hive cli命令列註冊的,可能會在beeline或者hiveserver2遠端連線時,提示不存在該函式。解決辦法是,在無法使用UDF的HiveServer2上,執行reload function
命令,將MetaStore
中新增的UDF資訊同步到HiveServer2記憶體中。
場景
UDF在hive中使用場景廣泛,這裡列舉常用的使用場景。