Pig系統分析(8)-Pig可擴展性
本文是Pig系統分析系列中的最後一篇了,主要討論怎樣擴展Pig功能。不僅介紹Pig本身提供的UDFs擴展機制,還從架構上探討Pig擴展可能性。
補充說明:前些天同事發現twitter推動的Pig On Spark項目:Spork,準備研究下。
UDFs
通過UDFs(用戶自己定義函數),能夠自己定義數據處理方法,擴展Pig功能。實際上,UDFS除了使用之前須要register/define外。和內置函數沒什麽不同。
主要的EvalFunc
以內置的ABS函數為例:
public class ABS extends EvalFunc<Double>{ /** * java level API * @param input expectsa single numeric value * @return output returns a single numeric value, absolute value of the argument */ public Double exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; Double d; try{ d = DataType.toDouble(input.get(0)); } catch (NumberFormatException nfe){ System.err.println("Failed to process input; error -" + nfe.getMessage()); return null; } catch (Exception e){ throw new IOException("Caught exception processing input row", e); } return Math.abs(d); } …… public Schema outputSchema(Schema input) ; public List<FuncSpec> getArgToFuncMapping() throws FrontendException; }
- 函數都繼承EvalFunc接口,泛型參數Double代表返回類型。
- exec方法:輸入參數類型為元組,代表一行記錄。
- outputSchema方法:用於處理輸入和輸出Schema
- getArgToFuncMapping:用於支持各種數據類型重載。
聚合函數
EvalFuc方法也能實現聚合函數,這是由於group操作對每一個分組都返回一條記錄,每組中包括一個Bag,所以exec方法中叠代處理Bag中記錄就可以。
以Count函數為例:public Long exec(Tuple input) throws IOException { try { DataBag bag = (DataBag)input.get(0); if(bag==null) return null; Iterator it = bag.iterator(); long cnt = 0; while (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null ) cnt++; } return cnt; } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing count in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } }
Algebraic 和Accumulator 接口
如前所述,具備algebraic性質的聚合函數在Map-Reduce過程中能被Combiner優化。直觀來理解,具備algebraic性質的函數處理過程能被分為三部分:initial(初始化,處理部分輸入數據)、intermediate(中間過程,處理初始化過程的結果)和final(收尾,處理中間過程的結果)。
比方COUNT函數,初始化過程為count計數操作。中間過程和收尾為sum求和操作。更進一步。假設函數在這三個階段中都能進行同樣的操作,那麽函數具備distributive性質。比方SUM函數。
Pig提供了Algebraic 接口:
public interface Algebraic{ /** * Get the initial function. * @return A function name of f_init. f_init shouldbe an eval func. * The return type off_init.exec() has to be Tuple */ public String getInitial(); /** * Get the intermediatefunction. * @return A function name of f_intermed. f_intermedshould be an eval func. * The return type off_intermed.exec() has to be Tuple */ public String getIntermed(); /** * Get the final function. * @return A function name of f_final. f_final shouldbe an eval func parametrized by * the same datum as the evalfunc implementing this interface. */ public String getFinal(); }
當中每一個方法都返回EvalFunc實現類的名稱。
繼續以COUNT函數為例,COUNT實現了Algebraic接口。針對下面語句:
input= load ‘data‘ as (x, y); grpd= group input by x; cnt= foreach grpd generate group, COUNT(input); storecnt into ‘result‘;Pig會重寫MR運行計劃:
Map load,foreach(group,COUNT.Initial) Combine foreach(group,COUNT.Intermediate) Reduce foreach(group,COUNT.Final),storeAlgebraic 接口通過Combiner優化降低傳輸數據量,而Accumulator接口則關註的是內存使用量。UDF實現Accumulator接口後,Pig保證全部key相同的數據(通過Shuffle)以增量的形式傳遞給UDF(默認pig.accumulative.batchsize=20000)。相同。COUNT也實現了Accumulator接口。
/* Accumulator interface implementation */ private long intermediateCount = 0L; @Override public void accumulate(Tuple b) throws IOException { try { DataBag bag = (DataBag)b.get(0); Iterator it = bag.iterator(); while (it.hasNext()){ Tuple t = (Tuple)it.next(); if (t != null && t.size() > 0 && t.get(0) != null) { intermediateCount += 1; } } } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing min in " + this.getClass().getSimpleName(); throw new ExecException(msg, errCode, PigException.BUG, e); } } @Override public void cleanup() { intermediateCount = 0L; } @Override /* *當前key都被處理完之後被調用 */ public Long getValue() { return intermediateCount; }
前後端數據傳遞
通過UDFs構造函數傳遞數據是最簡單的方法。然後通過define語句定義UDF實例時指定構造方法參數。但有些情況下。比方數據在執行期才產生,或者數據不能用String格式表達,這時候就得使用UDFContext了。
UDF通過getUDFContext方法獲取保存在ThreadLoacl中的UDFContext實例。
UDFContext包括下面信息:
- jconf:Hadoop Configuration。
- clientSysProps:系統屬性。
- HashMap<UDFContextKey,Properties> udfConfs:用戶自己保存的屬性,當中UDFContextKey由UDF類名生成。
UDFs運行流程
Pig架構可擴展性
Pig哲學之三——Pigs Live Anywhere。
理論上。Pig並不被限定執行在Hadoop框架上,有幾個能夠參考的實現和提議。
- Pigen。Pig on Tez。https://github.com/achalsoni81/pigeon,架構圖例如以下:
- Pig的後端抽象層:https://wiki.apache.org/pig/PigAbstractionLayer。
眼下已經實現了PigLatin執行在Galago上。
http://www.galagosearch.org/
參考資料
Pig官網:http://pig.apache.org/
Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience
Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates
Pig系統分析(8)-Pig可擴展性