1. 程式人生 > >Hive+GenericUDF示例二

Hive+GenericUDF示例二

     再來看一個分數統計的小例子。

        在Hive中存在如下一張表:

  1. hive> describe tb_test2;  
  2. OK  
  3. name    string    
  4. score_list  array<map<string,int>>    
  5. Time taken: 0.074 seconds  
  6. hive> select * from tb_test2;  
  7. OK  
  8. A   [{"math":100,"english":90,"history":85}]  
  9. B   [{"math":95,"english":80,"history":100}]  
  10. C   [{"math":80,"english":90,"histroy":100}]  
  11. Time taken: 0.107 seconds  

        編寫genericUDF.

  1. package com.wz.udf;  
  2. import org.apache.hadoop.io.Text;  
  3. import org.apache.hadoop.io.IntWritable;  
  4. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;  
  5. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;  
  6. import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;  
  7. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;  
  8. import org.apache.hadoop.hive.ql.metadata.HiveException;  
  9. import org.apache.hadoop.hive.serde2.lazy.LazyString;  
  10. import org.apache.hadoop.hive.serde2.lazy.LazyMap;  
  11. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
  12. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;  
  13. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;  
  14. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;  
  15. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;  
  16. import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;  
  17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;  
  18. import org.apache.hadoop.hive.serde2.objectinspector.StructField;  
  19. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
  20. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;  
  21. import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;  
  22. import java.util.ArrayList;  
  23. publicclass helloGenericUDFNew extends GenericUDF {  
  24.      ////輸入變數定義
  25.      private ObjectInspector nameObj;  
  26.      private ListObjectInspector listoi;  
  27.      private MapObjectInspector mapOI;  
  28.      private ArrayList<Object> valueList = new ArrayList<Object>();   
  29.      @Override
  30.      public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {  
  31.           nameObj = (ObjectInspector)arguments[0];  
  32.           listoi = (ListObjectInspector)arguments[1];  
  33.       mapOI = ((MapObjectInspector)listoi.getListElementObjectInspector());  
  34.           //輸出結構體定義
  35.           ArrayList structFieldNames = new ArrayList();  
  36.           ArrayList structFieldObjectInspectors = new ArrayList();  
  37.           structFieldNames.add("name");  
  38.       structFieldNames.add("totalScore");  
  39.           structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );  
  40.           structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableIntObjectInspector );  
  41.           StructObjectInspector si2;  
  42.           si2 = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);   
  43.           return si2;  
  44.      }  
  45.      @Override
  46.      public Object evaluate(DeferredObject[] arguments) throws HiveException{  
  47.       LazyString LName = (LazyString)(arguments[0].get());  
  48.       String strName = ((StringObjectInspector)nameObj).getPrimitiveJavaObject( LName );  
  49.       int nelements = listoi.getListLength(arguments[1].get());  
  50.           int nTotalScore=0;  
  51.           valueList.clear();  
  52.           //遍歷list
  53.       for(int i=0;i<nelements;i++)  
  54.       {   
  55.                LazyMap LMap = (LazyMap)listoi.getListElement(arguments[1].get(),i);  
  56.                //獲取map中的所有value值
  57.            valueList.addAll(mapOI.getMap(LMap).values());   
  58.                for (int j = 0; j < valueList.size(); j++)  
  59.            {  
  60.                    nTotalScore+=Integer.parseInt(valueList.get(j).toString());  
  61.                }                 
  62.           }  
  63.       Object[] e;     
  64.       e = new Object[2];  
  65.       e[0] = new Text(strName);  
  66.           e[1] = new IntWritable(nTotalScore);  
  67.           return e;  
  68.      }  
  69.      @Override
  70.      public String getDisplayString(String[] children) {  
  71.           assert( children.length>0 );  
  72.           StringBuilder sb = new StringBuilder();  
  73.           sb.append("helloGenericUDFNew(");  
  74.           sb.append(children[0]);  
  75.           sb.append(")");  
  76.           return sb.toString();  
  77.      }  
  78. }  

        在Hive中執行,結果如下:

  1. hive> add jar /home/wangzhun/hive/hive-0.8.1/lib/helloGenericUDFNew.jar;      
  2. Added /home/wangzhun/hive/hive-0.8.1/lib/helloGenericUDFNew.jar to class path  
  3. Added resource: /home/wangzhun/hive/hive-0.8.1/lib/helloGenericUDFNew.jar  
  4. hive> create temporary function hellonew as 'com.wz.udf.helloGenericUDFNew';  
  5. OK  
  6. Time taken: 0.016 seconds  
  7. hive> select hellonew(tb_test2.name,tb_test2.score_list) from tb_test2;       
  8. Total MapReduce jobs = 1  
  9. Launching Job 1 out of 1  
  10. Number of reduce tasks is set to 0 since there's no reduce operator  
  11. Starting Job = job_201312091733_0018, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201312091733_0018  
  12. Kill Command = /home/wangzhun/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201312091733_0018  
  13. Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0  
  14. 2013-12-09 22:31:22,328 Stage-1 map = 0%,  reduce = 0%  
  15. 2013-12-09 22:31:25,354 Stage-1 map = 100%,  reduce = 0%  
  16. 2013-12-09 22:31:28,390 Stage-1 map = 100%,  reduce = 100%  
  17. Ended Job = job_201312091733_0018  
  18. MapReduce Jobs Launched:   
  19. Job 0: Map: 1   HDFS Read: 99 HDFS Write: 18 SUCESS  
  20. Total MapReduce CPU Time Spent: 0 msec  
  21. OK  
  22. {"people":"A","totalscore":275}  
  23. {"people":"B","totalscore":275}  
  24. {"people":"C","totalscore":270}  
  25. Time taken: 21.7 seconds