hadoop——hive視訊觀看熱度,Top N案例(youtube)
- 資料準備
user.txt
0.txt
欄位以及欄位名解析
user表
欄位 備註 欄位型別
uploader 上傳者使用者名稱 string
videos 上傳視訊數 int
friends 朋友數量 int
視訊表:
欄位 備註 詳細描述
video id 視訊唯一 id 11 位字串
uploader 視訊上傳者 上傳視訊的使用者名稱 String
age 視訊年齡 視訊上傳日期和 2007 年 2 月
15 日之間的整數天(Youtube的獨特設定)
category 視訊類別 上傳視訊指定的視訊分類
length 視訊長度 整形數字標識的視訊長度
views 觀看次數 視訊被瀏覽的次數
rate 視訊評分 滿分 5 分
ratings 流量 視訊的流量,整型數字
conments 評論數 一個視訊的整數評論數
related ids 相關視訊 id 相關視訊的 id,最多 20 個
- 資料清洗
- 通過mapreduce將資料清洗出來,通過觀察原始資料形式,可以發現,視訊可以有多個所屬分類,每個所屬分類用&符號分割,且分割的兩邊有空格字元,同時相關視訊也是可以有多個元素,多個相關視訊又用“\t”進行分割。為了分析資料時方便對存在多個子元素的資料進行操作,我們首先進行資料重組清洗操作。即:將所有的類別用“&”分割,同時去掉兩邊空格,多個相關視訊 id 也使用“&”進行分割。將資料放到hdfs指定的資料夾裡面。
ETL資料清洗
ETLUtils.java
package ETLUtils;
public class ETLUtils {
public static String getETCString(String str){
String[] lines=str.split("\t");
StringBuilder newLines=new StringBuilder();
//1.去掉空格
lines[3]=lines[3].replaceAll(" ","");
//2.過濾不合法的值
if(lines.length<9) return null;
//3.大於9的下標的\t變化成&連線符
for(int i=0;i<lines.length;i++){
newLines.append(lines[i]);
if(i<9){
newLines.append("\t");
}else{
if(i!=lines.length-1){
newLines.append("&");
}
}
}
return newLines.toString();
}
}
ETLMapper
package mapper;
import ETLUtils.ETLUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<Object,Text,NullWritable,Text> {
Text text=new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String lines=ETLUtils.getETCString(value.toString());
if(StringUtils.isBlank(lines)) return;
text.set(lines);
context.write(NullWritable.get(),text);
}
}
ETLRunner
package runner;
import java.io.IOException;
import mapper.ETLMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ETLRunner implements Tool {
private Configuration conf=null;
@Override
public void setConf(Configuration conf) {
this.conf=conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public int run(String[] args) throws Exception {
conf=this.getConf();
//傳輸路徑變數
conf.set("inpath",args[0]);
conf.set("outpath",args[1]);
Job job=Job.getInstance(conf,"youtub_etl_video");
job.setJarByClass(ETLRunner.class);
job.setMapperClass(ETLMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
this.initInputPath(job);
this.initOutputPath(job);
return job.waitForCompletion(true)? 0:1;
}
private void initInputPath(Job job) throws IOException {
Configuration conf=job.getConfiguration();
String inpaths=conf.get("inpath");
//獲取抽象檔案系統物件
FileSystem fs=FileSystem.get(conf);
//建立hdfs路徑實體物件
Path inpath =new Path(inpaths);
//判斷檔案系統存在該路徑
if(fs.exists(inpath)){
//設定輸入路徑
FileInputFormat.addInputPath(job,inpath);
}else{
throw new RuntimeException("HDFS目錄不存在"+inpaths);
}
}
private void initOutputPath(Job job) throws IOException {
Configuration conf=job.getConfiguration();
String outpath=conf.get("outpath");
FileSystem fs=FileSystem.get(conf);
Path opath=new Path(outpath);
if(fs.exists(opath)){
//存在輸出路徑,刪除輸出路徑
fs.delete(opath,true);
}
FileOutputFormat.setOutputPath(job,opath);
}
public static void main(String[] args) {
try {
int result=ToolRunner.run(new ETLRunner(),args);
if(result==0){
System.out.println("Success!");
}else{
System.out.println("Fail!");
}
System.exit(result);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
- 建立四張表打算用orc的表進行操作(操作效率比較高),但是orc型別的表只能通過insert的形式插入資料,所以需要建立兩張ori的表再進行插入操作。
create table youtube_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile;
建立user ori表 create table youtube_user_ori( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as textfile;
//建立orc表 create table youtube_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc;
create table youtube_user_orc( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as orc;
|
- 匯入資料到表ori中
load data inpath '/output/part-m-00000' into table youtube_ori;
load data inpath '/output/user.txt' into table youtube_user_ori;
- 將表匯入到orc中
用insert into table A select * from B倒入到orc表
- 資料分析
欄位:
視訊id 上傳者 視訊年齡 視訊類別 觀看長度 觀看次數 視訊評分 流量 評論數 相關視訊
videoId,uploader,age,category,length,views,rate,ratings,comments,relatedId
- 統計視訊觀看數top10
create table viewstop10 as select videoId,uploader,age,category,length,views,rate,ratings,comments,relatedId from youtube_orc order by views desc limit 10;
- 統計類別熱度top10
create table hotTop10 as select t1.category_name as category,count(t1.videoId) as hot from(
select videoId,category_name from youtube_orc lateral view explode(category) t_catetory as category_name ) t1
group by t1.category_name order by hot desc limit 10;
- 統計出視訊觀看數最高的 20 個視訊的所屬類別以及類別包含這 Top20 視訊的個數
create table top20views_countCategory as
select category_name,count(videoId) as vcount from
(select videoId,category from
( select * from youtube_orc order by views desc limit 20) t1) t2 lateral view explode(category) t_catetory as category_name
group by category_name order by vcount desc;
- 統計視訊觀看數 Top50 所關聯視訊的所屬類別的熱度排名
select category_name,count(views) as mcount from
(select videoId,category_name,views from
(select videoId,category,views from youtube_orc order by views desc limit 50) t1
lateral view explode(category) t_category as category_name
) t2 group by category_name order by mcount desc;
- 統計每個類別中的視訊熱度 Top10,以 Music 為例
create table youtube_category(
videoId string,
uploader string,
age int,
categoryId string,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
row format delimited
fields terminated by "\t"
collection items terminated by "&"
stored as orc;
insert into table youtube_category
select
videoId,
uploader,
age,
categoryId,
length,
views,
rate,
ratings,
comments,
relatedId
from
youtube_orc lateral view explode(category) catetory as categoryId
create table musicTop10 as
select videoId,categoryId,views from youtube_category where categoryId="Music" order by views desc limit 10;
- 統計每個類別中視訊流量 Top10,以 Music 為例
create table ratingsTop10 as
select videoId,views,ratings from youtube_category where categoryId="Music" order by ratings desc limit 10;
- 統計上傳視訊最多的使用者 Top10 以及他們上傳的觀看次數在前 20 的視訊
select t2.videoId,t2.uploader,t2.views,t1.videos from
(select * from youtube_user_orc order by videos desc limit 10) t1 join youtube_orc t2 on t1.uploader=t2.uploader order by t2.views
limit 20;
- 統計每個類別視訊觀看數 Top10
create table categoryId_views as
select * from
(select videoId,categoryId,views,row_number() over(partition by categoryId order by views desc) rant from youtube_category) t1
where rant<=10;