1. 程式人生 > 實用技巧 >Hive UDAF介紹與開發

Hive UDAF介紹與開發

UDAF簡介

UDAF是使用者自定義聚合函式。Hive支援其使用者自行開發聚合函式完成業務邏輯。

通俗點說,就是你可能需要做一些特殊的甚至是非常扭曲的邏輯聚合,但是Hive自帶的聚合函式不夠玩,同時也還找不到高效的等價玩法,那麼,這時候就該自己寫一個UDAF了。

而從實現上來看,Hive的UDAF分為兩種:

  • Simple。即繼承org.apache.hadoop.hive.ql.exec.UDAF類,並在派生類中以靜態內部類的方式實現org.apache.hadoop.hive.ql.exec.UDAFEvaluator介面。這種方式簡單直接,但是在使用過程中需要依賴JAVA反射機制,因此效能相對較低。在Hive原始碼包org.apache.hadoop.hive.contrib.udaf.example
    中包含幾個示例。可以直接參閱。但是這些介面已經被註解為Deprecated,建議不要使用這種方式開發新的UDAF函式。
  • Generic。這是Hive社群推薦的新的寫法,以抽象類代替原有的介面。新的抽象類org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver替代老的UDAF介面,新的抽象類org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator替代老的UDAFEvaluator介面。

產生這兩種方式的原因並不高深,就是結構演進,歷史遺留。原文連結最後一段說明了一下演進的版本以及原因。

UDAF相關類和介面簡介

  • AbstractGenericUDAFResolver:該抽象類實現了GenericUDAFResolver2的介面。UDAF主類須繼承該抽象類,其主要作用是實現引數型別檢查和操作符過載。可以為同一個函式實現不同入參的版本。
  • org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator:該抽象類為UDAF具體的邏輯處理,包括幾個必須實現的抽象方法,這幾個方法負責完成UDAF所需要處理的邏輯。

UDAF的執行流程簡介

抽象類GenericUDAFEvaluator中,包含一個靜態內部列舉類,和一系列抽象方法。這個列舉類的註釋中,解釋了各個列舉值的執行階段和執行內容。按照時間先後順序,分別有:

  • PARTIAL1:原始資料到部分聚合,呼叫iterate和terminatePartial --> map階段
  • PARTIAL2: 部分聚合到部分聚合,呼叫merge和terminatePartial --> combine階段
  • FINAL: 部分聚合到完全聚合,呼叫merge和terminate --> reduce階段
  • COMPLETE: 從原始資料直接到完全聚合 --> map階段,並且沒有reduce

那麼,這幾個方法分別幹了些啥呢?

  • init: 例項化Evaluator類的時候呼叫的,在不同的階段需要返回不同的OI。其入參和返回值,以及Mode階段的關係如下表:

    入參返回值的使用者
    P1 原始資料 terminatePartial
    P2 部分聚合資料 terminatePartial
    F 部分聚合資料 terminate
    C 原始資料 terminate
  • getNewAggregationBuffer: 獲取存放中間結果的物件
  • iterate:處理一行資料
  • terminatePartial:返回部分聚合資料的持久化物件。因為呼叫這個方法時,說明已經是map或者combine的結束了,必須將資料持久化以後交給reduce進行處理。只支援JAVA原始資料型別及其封裝型別、HADOOP Writable型別、List、Map,不能返回自定義的類,即使實現了Serializable也不行,否則會出現問題或者錯誤的結果。
  • merge:將terminatePartial返回的部分聚合資料進行合併,需要使用到對應的OI。
  • terminate:結束,生成最終結果。

兩類UDAF基本原理相同,下面以histogram_numeric這個系統自帶的Generic UDAF為例,描述一下UDAF的執行和開發過程。這個函式涵蓋了UDAF多個特性,比如入參型別檢查並返回複雜資料型別。

UDAF開發

1. 構造UDAF程式碼骨架部分

先搭建好程式碼骨架,完成需要繼承的類和介面結構。

public class GenericUDAFHistogramNumeric extends AbstractGenericUDAFResolver {
  static final Log LOG = LogFactory.getLog(GenericUDAFHistogramNumeric.class.getName());
 
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    // TODO: 1. Type-checking goes here!
 
    return new GenericUDAFHistogramNumericEvaluator();
  }
 
  public static class GenericUDAFHistogramNumericEvaluator extends GenericUDAFEvaluator {
    // UDAF logic goes here!
  }
}

2.實現getEvaluator方法

該方法非常簡單,其主要目的是校驗UDAF的入參個數和入參型別並返回Evaluator物件。呼叫者傳入不同的引數時,向其返回不同的Evaluator或者直接丟擲異常。這部分程式碼可以寫入骨架程式碼中的TODO:1處。例如本例中的實現,該UDAF不支援多種引數的版本,限定引數個數必須為2,並且第一個引數必須是簡單資料型別,第二個引數必須是int。

 @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
    if (parameters.length != 2) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Please specify exactly two arguments.");
    }

    // validate the first parameter, which is the expression to compute over
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }
    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
    case BYTE:
    case SHORT:
    case INT:
    case LONG:
    case FLOAT:
    case DOUBLE:
    case TIMESTAMP:
    case DECIMAL:
      break;
    case STRING:
    case BOOLEAN:
    case DATE:
    default:
      throw new UDFArgumentTypeException(0,
          "Only numeric type arguments are accepted but "
          + parameters[0].getTypeName() + " was passed as parameter 1.");
    }

    // validate the second parameter, which is the number of histogram bins
    if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(1,
          "Only primitive type arguments are accepted but "
          + parameters[1].getTypeName() + " was passed as parameter 2.");
    }
    if( ((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory()
        != PrimitiveObjectInspector.PrimitiveCategory.INT) {
      throw new UDFArgumentTypeException(1,
          "Only an integer argument is accepted as parameter 2, but "
          + parameters[1].getTypeName() + " was passed instead.");
    }

    return new GenericUDAFHistogramNumericEvaluator();
  }

3.實現Evaluator

從骨架程式碼中,可以看到一個靜態內部類實現了Evaluator的抽象類,並且必須實現它的幾個抽象方法。這些方法的呼叫時機即意義參見上面的表格以及GenericUDAFEvaluator類的靜態內部列舉類Mode

4. 註冊函式

將函式直接寫入FunctionRegistry類的靜態程式碼塊中,system.registerGenericUDAF("histogram_numeric", new GenericUDAFHistogramNumeric());,或者將UDAF程式碼單獨打包成jar,採用CREATE FUNCTION語句建立函式。