題目:使用UDF函式統計出flow.dat日誌檔案當中每個網站的瀏覽次數
一:編寫MapReduce程式清洗資料
我們需要的是統計日誌檔案中每個網站的瀏覽次數,為了方便起見,我們只取網站這一列資料。取出網站資料的這一操作就在map中進行,在reduce中無需對資料做處理。
原始碼:
MyMapper類
package com.WebsiteCount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class MyMapper extends Mapper<LongWritable,Text,LongWritable,Text> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String data = value.toString(); String[] splitedData = data.split("\t"); Text outValue=new Text(splitedData[12]); context.write(key,outValue); System.out.println("Mapper輸出<"+key.toString()+","+outValue.toString()+">"); } }
MyReduce類
package com.WebsiteCount; import java.io.IOException; import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.StringUtils; public class MyReduce extends Reducer<LongWritable,Text,Text,NullWritable>{ private MultipleOutputs<Text,Text> mos; @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for(Text value : values) { System.out.println("reduce輸入鍵值對<"+key.toString()+","+value.toString()+">"); context.write(value, NullWritable.get()); System.out.println("reduce輸出鍵值對<"+value.toString()+", "); } } }
驅動類 Website.java
package com.WebsiteCount; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Website{ static final String INPUT_PATH = "hdfs://192.xxx.xx.xxx:9000/webcount/in"; static final String OUT_PATH = "hdfs://192.xxx.xx.xxx:9000/webcount/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); final Path outPath = new Path(OUT_PATH); if(fileSystem.exists(outPath)){ fileSystem.delete(outPath, true); } final Job job = new Job(conf, Website.class.getSimpleName()); // job.setJarByClass(Website.class); //1.1指定讀取的檔案位於哪裡 FileInputFormat.setInputPaths(job, INPUT_PATH); //指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定義的map類 job.setMapperClass(MyMapper.class); //map輸出的<k,v>型別。 job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //2.2 指定自定義reduce類 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); //2.3 指定寫出到哪裡 FileOutputFormat.setOutputPath(job, outPath); //把job提交給JobTracker執行 job.waitForCompletion(true); } }
接下來,進入linux下面的/usr/local目錄下:
在hdfs中建立目錄:
hdfs dfs -mkdir -p /webcount/in
將原始檔案flow.dat日誌檔案上傳到hdfs 中的webcount/in目錄下:
在eclipse中執行上述程式,部分結果如下:
在hdfs上面檢視是否成功:
hdfs dfs -ls /webcount/out
在linux上面檢視返回否成功:
hdfs dfs -cat /webcount/out/part-r-00000
部分結果如下:
二:使用hive 中UDF統計出每個網站在日誌檔案中的瀏覽次數
進入hive,建表:
create table webcount(web,string)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY ','
> STROED AS TEXTFILE;
向建立的表中匯入資料:
load data inpath '/webcount/out/part-r-00000' into table webcount;
查看錶:
編寫UDF函式階段:
思路:
先自定義一個UDAF,它是多輸入一條輸出的聚合,所以結果拼成字串輸出:public class Top4GroupBy extends UDAF
首先是定義一個物件用來儲存資料: public static class State
注意,在累加資料時需要判斷map的key中是否存在該字串,如果存在累加,不存在放入map中(重點)。
還需要自定義一個UDTF,支援一個輸入多個輸出。安裝分隔符將字串切分,將字串轉化為多行的列表輸出:public class ExplodeMap extends GenericUDTF
最後:
這兩個函式分別以top_group和explode_map為函式名加入到hive函式庫中,來自網路的應用例子如下(獲取前100個landingrefer的top url 100):
hive -e "select t.landingrefer, mytable.col1, mytable.col2,mytable.col3 from (select landingrefer, top_group(url,100) pro, count(sid) s from pvlog where dt=20120719 and depth=1 group by landingrefer order by s desc limit 100) t lateral view explode_map(t.pro) mytable as col1, col2, col3;"> test
編寫UTAF GroupBy.java
package com.hive;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class GroupBy extends UDAF{
//定義一個物件用於儲存資料
public static class State {
private Map<Text, IntWritable> counts;
}
/**
* 累加資料,判斷map的key中是否存在該字串,如果存在累加,不存在放入map中
* @param s
* @param o
* @param i
*/
private static void increment(State s, Text o, int i) {
if (s.counts == null) {
s.counts = new HashMap<Text, IntWritable>();
}
IntWritable count = s.counts.get(o);
if (count == null) {
Text key = new Text();
key.set(o);
s.counts.put(key, new IntWritable(i));
} else {
count.set(count.get() + i);
}
}
public static class GroupByEvaluator implements UDAFEvaluator {
private final State state;
public GroupByEvaluator() {
state = new State();
}
public void init() {
if (state.counts != null) {
state.counts.clear();
}
}
public boolean iterate(Text value) {
if (value == null) {
return false;
} else {
increment(state, value, 1);
}
return true;
}
public State terminatePartial() {
return state;
}
public boolean merge(State other) {
if (state == null || other == null) {
return false;
}
for (Map.Entry<Text, IntWritable> e : other.counts.entrySet()) {
increment(state, e.getKey(), e.getValue().get());
}
return true;
}
public Text terminate() {
if (state == null || state.counts.size() == 0) {
return null;
}
Map<Text, IntWritable> it = sortByValue(state.counts, true);
StringBuffer str = new StringBuffer();
int i = 0;
for (Map.Entry<Text, IntWritable> e : it.entrySet()) {
++i;
str.append(e.getKey().toString()).append("[email protected]").append(e.getValue().get()).append("$*");
}
return new Text(str.toString());
}
/*
* 實現一個map按值的排序演算法
*/
@SuppressWarnings("unchecked")
public static Map sortByValue(Map map, final boolean reverse) {
List list = new LinkedList(map.entrySet());
Collections.sort(list, new Comparator() {
public int compare(Object o1, Object o2) {
if (reverse) {
return -((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());
}
return ((Comparable) ((Map.Entry) o1).getValue()).compareTo(((Map.Entry) o2).getValue());
}
});
Map result = new LinkedHashMap();
for (Iterator it = list.iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
result.put(entry.getKey(), entry.getValue());
}
return result;
}
}
}
編寫UDTF SplitResult.jav
package com.hive;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SplitResult extends GenericUDTF {
@Override
public void close() throws HiveException {
}
// 該方法指定輸入輸出引數:輸入的Object Inspectors和輸出的Struct。
//返回UDTF的返回行的資訊(返回個數,型別)
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
}//getCategory()可以獲得分類所有資訊,返回與查詢引數相匹配的類別物件陣列。
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("ExplodeMap takes string as a parameter");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("col2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
// 該方法對傳入的引數進行處理,處理輸入記錄,然後通過forward()方法返回輸出結果。
@Override
public void process(Object[] args) throws HiveException {
String input = args[0].toString();
String[] test = input.split("\\$\\*");
for (int i = 0; i < test.length; i++) {
try {
String[] result = new String[2];
String[] sp= test[i].split("\\$\\@");
result[0] =sp[0];
result[1] =sp[1];
//呼叫父類的forward方法進行資料的寫出
forward(result);
} catch (Exception e) {
continue;
}
}
}
}
將UDF新增到hive中:
將編寫的UDF打成jar包。
右擊專案名,Export
設定jar包輸出路徑及jar包名。
打好的jar如下:
將jar包傳到linux上
通過winSCP工具將打好的jar包上傳到linux的/usr/local目錄下:(下圖為winscp工具截圖)
將打好的jar包新增到hive中:
在hive-site.xml檔案中新增以下內容:
在hive中註冊函式:
create function group_by as 'com.yjw.hive.GroupBy';
create function splitrs as 'com.yjw.hive.SplitResult';
使用註冊的函式查詢
select webtimes.web,webtimes.times from (select group_by(web)pro from webcount) t lateral view splitrs(t.pro) webtimes as web,times;
結果部分如下:
實驗結束。