hive中使用自定義函式(UDF)實現分析函式row_number的功能
1. hive0.10及之前的版本沒有row_number這個函式,假設我們現在出現如下業務場景,現在我們在hdfs上有個log日誌檔案,為了方便敘述,該檔案只有2個欄位,第一個是使用者的id,第二個是當天登入的timestamp,現在我們需要求每個使用者最早登入的那條記錄(注意不是僅僅只要那個登入的timestamp),可以方便計算NewUser。
2. 我們的資料是這樣的:
1,32
2,46
3,312
4,4643
5,54
6,456
7,437
8,5347
9,47
1,466
2,546
3,4
4,886
5,546
6,57
7,235
8,765
9,634
這裡是假設的資料。3.我們可以用hive建立一張表,其中第一個欄位是id string, 第二個是login_time bigint,假設我們的表名是log。
4. 這樣的場景可以用很多方法解決,但是我們可以用RowNumber函式,在0.11及以上的版本才整合到了hive中,但是我們公司用的是CDH4.5.0,hive才到0.10,所以只能自己寫個這樣的函式,具體的程式碼如下:
- import org.apache.hadoop.hive.ql.exec.UDF;
- publicclass RowN extends UDF {
- privatestaticint MAX_VALUE = 50;
- privatestatic String comparedColumn[] = new String[MAX_VALUE];
- privatestatic
- publicint evaluate(Object... args) {
- String columnValue[] = new String[args.length];
- for (int i = 0; i < args.length; i++) {
- columnValue[i] = args[i].toString();
- }
- if (rowNum == 1) {
- for (int i = 0; i < columnValue.length; i++)
- comparedColumn[i] = columnValue[i];
- }
- for (int i = 0; i < columnValue.length; i++) {
- if (!comparedColumn[i].equals(columnValue[i])) {
- for (int j = 0; j < columnValue.length; j++) {
- comparedColumn[j] = columnValue[j];
- }
- rowNum = 1;
- return rowNum++;
- }
- }
- return rowNum++;
- }
- }
- </span>
5. 稍微解釋下這個UDF,首先我們的UDF函式輸入是多個列的值,傳入多個值表示用多個值是否相同來打序號,對於我們的場景只要1個(就是id),函式row_number(),必須帶一個或者多個列引數,如ROW_NUMBER(col1, ....),它的作用是按指定的列進行分組生成行序列。在ROW_NUMBER(a,b) 時,若兩條記錄的a,b列相同,則行序列+1,否則重新計數。
6. 接下去關鍵的就是怎麼取使用這個函數了,我們必須保證查出來的資料是有序的,這樣才好加序號,而且要根據某個欄位排序,但是如果資料量大或者我們自己設定了多個reducer咋辦,這樣的話我們就想到了使用distribute by和sort by的配合使用,可以使key相同的資料進入同一個reducer,這樣就好辦了,那麼我們的hql語句其實就是一句話:
- createtemporaryfunction RowNumber as'xxx.xxx.xxx.udf.RowNumber';
- select id, login_time from (select * from log distribute by id sort by id, login_time asc) tmp where RowNumber(id)=1;
132
246
34
4886
554
625
775
8534
947
注:
1. 如果對distribute by不熟悉可以看另一個我的部落格,有具體的解釋:http://blog.csdn.net/jthink_/article/details/38903775
2. 這個函式最關鍵的部分就是得先有序,所以加序號前必須保證資料有序