1. 程式人生 > >Hive+GenericUDF示例一

Hive+GenericUDF示例一

           和UDF相比,通用GDF(GenericUDF)支援複雜型別(比如List,struct等)的輸入和輸出。

           下面來看一個小示例。

            Hive中whereme表中包含若干人的行程如下:  

A       2013-10-10 8:00:00      home
A       2013-10-10 10:00:00     Super Market
A       2013-10-10 12:00:00     KFC
A       2013-10-10 15:00:00     school
A       2013-10-10 20:00:00     home
A       2013-10-15 8:00:00      home
A       2013-10-15 10:00:00     park
A       2013-10-15 12:00:00     home
A       2013-10-15 15:30:00     bank
A       2013-10-15 19:00:00     home

           通過查詢我們要得到如下結果: 

A	2013-10-10	08:00:00	home	10:00:00	Super Market
A	2013-10-10	10:00:00	Super Market	12:00:00	KFC
A	2013-10-10	12:00:00	KFC	15:00:00	school
A	2013-10-10	15:00:00	school	20:00:00	home
A	2013-10-15	08:00:00	home	10:00:00	park
A	2013-10-15	10:00:00	park	12:00:00	home
A	2013-10-15	12:00:00	home	15:30:00	bank
A	2013-10-15	15:30:00	bank	19:00:00	home

           1.編寫GenericUDF.

package com.wz.udf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date; 
import java.util.Calendar;
import java.util.ArrayList;
 
public class helloGenericUDF extends GenericUDF {
     ////輸入變數定義
     private ObjectInspector peopleObj;
     private ObjectInspector timeObj;
     private ObjectInspector placeObj;
     //之前記錄儲存
     String strPreTime = "";
     String strPrePlace = ""; 
     String strPrePeople = "";
 
     @Override
     //1.確認輸入型別是否正確
     //2.輸出型別的定義
     public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
          peopleObj = (ObjectInspector)arguments[0];
          timeObj = (ObjectInspector)arguments[1];
          placeObj = (ObjectInspector)arguments[2];
          //輸出結構體定義
          ArrayList structFieldNames = new ArrayList();
          ArrayList structFieldObjectInspectors = new ArrayList();
          structFieldNames.add("people");
	  structFieldNames.add("day");
          structFieldNames.add("from_time");
          structFieldNames.add("from_place");
          structFieldNames.add("to_time");
          structFieldNames.add("to_place");
 
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
          structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
	  structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );
  	  structFieldObjectInspectors.add( PrimitiveObjectInspectorFactory.writableStringObjectInspector );

          StructObjectInspector si2;
          si2 = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors); 
          return si2;
     }
 
     //遍歷每條記錄
     @Override
     public Object evaluate(DeferredObject[] arguments) throws HiveException{
	  LazyString LPeople = (LazyString)(arguments[0].get());
	  String strPeople = ((StringObjectInspector)peopleObj).getPrimitiveJavaObject( LPeople );

	  LazyString LTime = (LazyString)(arguments[1].get());
	  String strTime = ((StringObjectInspector)timeObj).getPrimitiveJavaObject( LTime );

	  LazyString LPlace = (LazyString)(arguments[2].get());
	  String strPlace = ((StringObjectInspector)placeObj).getPrimitiveJavaObject( LPlace );
	
	  Object[] e;	
	  e = new Object[6];

          try
	  {
                //如果是同一個人,同一天
	  	if(strPrePeople.equals(strPeople) && IsSameDay(strTime) )
	  	{
	       		e[0] = new Text(strPeople);
                        e[1] = new Text(GetYearMonthDay(strTime));
	       		e[2] = new Text(GetTime(strPreTime));
	       		e[3] = new Text(strPrePlace);
	       		e[4] = new Text(GetTime(strTime));
	       		e[5] = new Text(strPlace);
	  	}
                else
                {
	       		e[0] = new Text(strPeople);
			e[1] = new Text(GetYearMonthDay(strTime));
	       		e[2] = new Text("null");
	       		e[3] = new Text("null");
	       		e[4] = new Text(GetTime(strTime));
	       		e[5] = new Text(strPlace);
                }
          }
          catch(java.text.ParseException ex)
          {
          }
	       
	  strPrePeople = new String(strPeople);
	  strPreTime= new String(strTime);
	  strPrePlace = new String(strPlace);

          return e;
     }
 
     @Override
     public String getDisplayString(String[] children) {
          assert( children.length>0 );
 
          StringBuilder sb = new StringBuilder();
          sb.append("helloGenericUDF(");
          sb.append(children[0]);
          sb.append(")");
 
          return sb.toString();
     }

     //比較相鄰兩個時間段是否在同一天
     private boolean IsSameDay(String strTime) throws java.text.ParseException{   
	 if(strPreTime.isEmpty()){
	     return false;
         }
         String curDay = GetYearMonthDay(strTime);
         String preDay = GetYearMonthDay(strPreTime);
	 return curDay.equals(preDay);
     }

     //獲取年月日
     private String GetYearMonthDay(String strTime)  throws java.text.ParseException{
         DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         Date curDate = df.parse(strTime);
	 df = new SimpleDateFormat("yyyy-MM-dd");
         return df.format(curDate);
     }

     //獲取時間
     private String GetTime(String strTime)  throws java.text.ParseException{
         DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         Date curDate = df.parse(strTime);
         df = new SimpleDateFormat("HH:mm:ss");
         return df.format(curDate);
     }
}

           2.在Hive裡面建立兩張表,一張包含結構體的表儲存執行GenericUDF查詢後的結果,另外一張用於儲存最終結果.

hive> create table whereresult(people string,day string,from_time string,from_place string,to_time string,to_place string);
OK
Time taken: 0.287 seconds
hive> create table tmpResult(info struct<people:string,day:string,from_time:str>ing,from_place:string,to_time:string,to_place:string>);
OK
Time taken: 0.074 seconds

           3.執行GenericUDF查詢,得到最終結果。  

hive> insert overwrite table tmpResult select hellogenericudf(whereme.people,whereme.time,whereme.place) from whereme;
hive> insert overwrite table whereresult select info.people,info.day,info.from_time,info.from_place,info.to_time,info.to_place from tmpResult where info.from_time<>'null';
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201312022129_0006, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201312022129_0006
Kill Command = /home/wangzhun/hadoop/hadoop-0.20.2/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201312022129_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-12-02 22:48:40,733 Stage-1 map = 0%,  reduce = 0%
2013-12-02 22:48:49,825 Stage-1 map = 100%,  reduce = 0%
2013-12-02 22:48:52,869 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201312022129_0006
Ended Job = -383357832, job is filtered out (removed at runtime).
Moving data to: hdfs://localhost:9000/tmp/hive-root/hive_2013-12-02_22-48-24_406_2701579121398466034/-ext-10000
Loading data to table default.whereresult
Deleted hdfs://localhost:9000/user/hive/warehouse/whereresult
Table default.whereresult stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 346, raw_data_size: 0]
8 Rows loaded to whereresult
MapReduce Jobs Launched: 
Job 0: Map: 1   HDFS Read: 420 HDFS Write: 346 SUCESS
Total MapReduce CPU Time Spent: 0 msec
OK
Time taken: 29.098 seconds
hive> select * from whereresult;
OK
A	2013-10-10	08:00:00	home	10:00:00	Super Market
A	2013-10-10	10:00:00	Super Market	12:00:00	KFC
A	2013-10-10	12:00:00	KFC	15:00:00	school
A	2013-10-10	15:00:00	school	20:00:00	home
A	2013-10-15	08:00:00	home	10:00:00	park
A	2013-10-15	10:00:00	park	12:00:00	home
A	2013-10-15	12:00:00	home	15:30:00	bank
A	2013-10-15	15:30:00	bank	19:00:00	home
Time taken: 0.105 seconds