Pig使用者自定義函式(UDF)
我們以氣溫統計和詞頻統計為例,講解以下三種使用者自定義函式。
使用者自定義函式
什麼時候需要使用者自定義函式呢?和其它語言一樣,當你希望簡化程式結構或者需要重用程式程式碼時,函式就是你不二選擇。
Pig的使用者自定義函式可以用Java編寫,但是也可以用Python或Javascript編寫。我們接下來以Java為例。
自定義過濾函式
我們仍然以先前的程式碼為例:
records = load 'hdfs://localhost:9000/input/temperature1.txt'as (year: chararray,temperature: int);
valid_records = filter records by temperature!=999;
第二個語句的作用就是篩選合法的資料。如果我們採用使用者自定義函式,則第二個語句可以寫成:
valid_records = filter records by isValid(temperature);
這種寫法更容易理解,也更容易在多個地方重用。接下來的問題就是如何定義這個isValid函式。程式碼如下:
packagecom.oserp.pigudf;
importjava.io.IOException;
importorg.apache.pig.FilterFunc;
importorg.apache.pig.data.Tuple;
public
class IsValidTemperature extends
@Override
public Boolean exec(Tuple tuple) throws IOException {
Object object = tuple.get(0);
int temperature = (Integer)object;
return temperature != 999;
}
}
接下來,我們需要:
1) 編譯程式碼並打包成jar檔案,比如pigudf.jar。
2) 通過register命令將這個jar檔案註冊到pig環境:
register/home/user/hadoop_jar/pigudf.jar //引數為jar檔案的本地路徑
此時,我們就可以用以下語句呼叫這個函式:
valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
dump valid_records;
看起來這個函式名太長,不便輸入。我們可以用定義別名的方式代替:
define isValid com.oserp.pigudf.IsValidTemperature();
valid_records = filter records by isValid(temperature);
dump valid_records;
回到程式碼,我們可發現:
1) 需要定義一個繼承自FilterFunc的類。
2) 重寫這個類的exec方法。這個方法的引數只有一個tuple,但是呼叫時可以傳遞多個引數,你可以通過索引號獲得對應的引數值,比如tuple.get(1)表示取第二個引數。
3) 呼叫時,需要使用類的全名。(當然你可以自定義別名)
4) 更多的驗證需要讀者自行在函式中新增,比如判斷是否為null等等。
備註:用Eclipse編寫Pig自定義函式時,你可能需要引用到一些Hadoop的庫檔案。比較容易的方式是在新建專案時指定專案型別為MapReduce專案,這樣Eclipse就會自動設定庫引用的相關資訊。
自定義運算函式(Eval function)
仍然以前面的資料檔案為例:
1990 21
1990 18
1991 21
1992 30
1992 999
1990 23
假設我們希望通過溫度值獲得一個溫度的分類資訊,比如我們把溫度大於劃分為以下型別:
溫度 分類
x>=30 hot
x>=10 and x<30 moderate
x<10 cool
則我們可以定義以下函式,程式碼如下:
packagecom.oserp.pigudf;
importjava.io.IOException;
importorg.apache.pig.EvalFunc;
importorg.apache.pig.data.Tuple;
public class GetClassification extends EvalFunc<String> {
@Override
public String exec(Tuple tuple) throws IOException {
Object object = tuple.get(0);
int temperature = (Integer)object;
if (temperature >= 30){
return "Hot";
}
else if(temperature >=10){
return "Moderate";
}
else {
return "Cool";
}
}
}
依次輸入以下Pig語句:
records = load'hdfs://localhost:9000/input/temperature1.txt' as (year: chararray,temperature:int);
register /home/user/hadoop_jar/pigudf.jar;
valid_records = filter records bycom.oserp.pigudf.IsValidTemperature(temperature);
result = foreach valid_records generateyear,com.oserp.pigudf.GetClassification(temperature);
dump result;
輸出結果如下:
(1990,Moderate)
(1990,Moderate)
(1991,Moderate)
(1992,Hot)
(1990,Moderate)
程式碼比較簡單,該類繼承自EvalFunc類,且我們要明確定義返回值型別。
有些時候其它類庫可能包含有功能相近的Java函式,我們是否可以直接將這些庫函式拿過來使用呢?可以。以下語句呼叫了trim函式,用於去掉name欄位前後的空格:
DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim','String');
B = FOREACH A GENERATE trim(name);
其中的InvokeForString是一個Invoker(不知道該如何翻譯啊),其通過反射機制呼叫,返回值是String型別。其它類似的還有InvokeForInt,InvokeForLong, InvokeForDouble,InvokeForFloat等等。
自定義載入函式
我們以詞頻統計為例,講解如何自定義載入函式。(統計各個單詞出現的頻率,由高到低排序)
一般情況下,load語句載入資料時,一行會被生成一個tuple。而統計詞頻時,我們希望每個單詞生成一個tuple。我們的測試資料檔案只有兩行資料,如下:
Thisis a map a reduce program
mapreduce partition combiner
我們希望load後能得到如下形式的資料,每個單詞一個tuple:
(This)
(is)
(a)
(map)
(a)
(reduce)
先看程式碼:
package com.oserp.pigudf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
importorg.apache.pig.backend.executionengine.ExecException;
importorg.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class WordCountLoadFunc extends LoadFunc {
private RecordReader reader;
TupleFactorytupleFactory = TupleFactory.getInstance();
BagFactorybagFactory = BagFactory.getInstance();
@Override
public InputFormatgetInputFormat() throws IOException {
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
try {
// 當讀取到分割槽資料塊的末尾時,返回null表示資料已讀取完
if (!reader.nextKeyValue()){
return null;
}
Textvalue = (Text)reader.getCurrentValue();
Stringline = value.toString();
String[]words = line.split("\\s+"); // 斷詞
// 因為getNext函式只能返回一個tuple,
// 而我們希望每個單詞一個單獨的tuple,
// 所以我們將多個tuple放到一個bag裡面,
// 然後返回一個包含一個bag的tuple。
// 注:這只是一個用於演示用法的示例,實際中這樣使用不一定合理。
List<Tuple>tuples = new ArrayList<Tuple>();
Tupletuple = null;
for (String word : words) {
tuple= tupleFactory.newTuple();
tuple.append(word);
tuples.add(tuple);
}
DataBagbag = bagFactory.newDefaultBag(tuples);
Tupleresult = tupleFactory.newTuple(bag);
return result;
}
catch (InterruptedException e) {
throw new ExecException(e);
}
}
@Override
public void prepareToRead(RecordReader reader,PigSplit arg1)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job,location);
}
}
依次執行以下命令:
1) records= load 'hdfs://localhost:9000/input/sample_small.txt' usingcom.oserp.pigudf.WordCountLoadFunc() as (words:bag{word:(w:chararray)});
2) flatten_records= foreach records generate flatten($0);
3) grouped_records= group flatten_records by words::w;
4) result= foreach grouped_records generate group,COUNT(flatten_records);
5) final_result= order result by $1 desc,$0;
6) dumpfinal_result;
顯示結果如下:
(a,2)
(map,2)
(reduce,2)
(This,1)
(combiner,1)
(is,1)
(partition,1)
(program,1)
注意schema的定義格式:(words:bag{word:(w:chararray)})