hive UDAF行列轉換
阿新 • • 發佈:2019-01-26
目標實現
1a
1b
2a
2c
轉換為
1a,b
2a,c
package com.hive.udf; //用法 select a,concat1(b,',') from concat_test group by a; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; public class Concat extends UDAF { public static class ConcatUDAFEvaluator implements UDAFEvaluator{ public static class PartialResult{ String result; String delimiter; } private PartialResult partial; //init函式實現介面UDAFEvaluator的init函式。 public void init() { partial = null; } //iterate接收傳入的引數,並進行內部的輪轉。其返回型別為boolean。 //接受傳入的引數,並進行內部的輪轉 public boolean iterate(String value,String deli){ if (value == null){ return true; } if (partial == null){ partial = new PartialResult(); partial.result = new String(""); if( deli == null || deli.equals("") ) { partial.delimiter = new String(","); } else { partial.delimiter = new String(deli); } } if ( partial.result.length() > 0 ) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(value); return true; } //terminatePartial無引數,其為iterate函式輪轉結束後,返回輪轉資料,terminatePartial類似於hadoop的Combiner。 public PartialResult terminatePartial(){ return partial; } //merge接收terminatePartial的返回結果,進行資料merge操作,其返回型別為boolean。 public boolean merge(PartialResult other){ if (other == null){ return true; } if (partial == null){ partial = new PartialResult(); partial.result = new String(other.result); partial.delimiter = new String(other.delimiter); } else { if ( partial.result.length() > 0 ) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(other.result); } return true; } public String terminate(){ return new String(partial.result); } } }
打包jar後
add jar rowtocol.jar
create temporary function concat_test as'com.hive.udf.Concat';
select
a,concat_test(b,',') from concat_test group by a;