1. 程式人生 > >hadoop——hive視訊觀看熱度,Top N案例(youtube)

hadoop——hive視訊觀看熱度,Top N案例(youtube)

  • 資料準備

user.txt

0.txt

欄位以及欄位名解析

 

user表

欄位 備註 欄位型別

uploader 上傳者使用者名稱 string

videos 上傳視訊數 int

friends 朋友數量 int

 

視訊表:

欄位 備註 詳細描述

video id 視訊唯一 id 11 位字串

uploader 視訊上傳者 上傳視訊的使用者名稱 String

age 視訊年齡 視訊上傳日期和 2007 年 2 月

15 日之間的整數天(Youtube的獨特設定)

category 視訊類別 上傳視訊指定的視訊分類

length 視訊長度 整形數字標識的視訊長度

views 觀看次數 視訊被瀏覽的次數

rate 視訊評分 滿分 5 分

ratings 流量 視訊的流量,整型數字

conments 評論數 一個視訊的整數評論數

related ids 相關視訊 id 相關視訊的 id,最多 20 個

  •  資料清洗
  1. 通過mapreduce將資料清洗出來,通過觀察原始資料形式,可以發現,視訊可以有多個所屬分類,每個所屬分類用&符號分割,且分割的兩邊有空格字元,同時相關視訊也是可以有多個元素,多個相關視訊又用“\t”進行分割。為了分析資料時方便對存在多個子元素的資料進行操作,我們首先進行資料重組清洗操作。即:將所有的類別用“&”分割,同時去掉兩邊空格,多個相關視訊 id 也使用“&”進行分割。將資料放到hdfs指定的資料夾裡面。

ETL資料清洗

 

ETLUtils.java

 

package ETLUtils;

public class ETLUtils {

    public static String getETCString(String str){
        String[] lines=str.split("\t");
        StringBuilder newLines=new StringBuilder();
        //1.去掉空格
        lines[3]=lines[3].replaceAll(" ","");
        //2.過濾不合法的值
        if(lines.length<9) return null;
        //3.大於9的下標的\t變化成&連線符
        for(int i=0;i<lines.length;i++){
            newLines.append(lines[i]);
            if(i<9){

                newLines.append("\t");
            }else{
                if(i!=lines.length-1){
                    newLines.append("&");
                }
            }
        }
        return newLines.toString();
    }

}

 

ETLMapper

 

package mapper;

import ETLUtils.ETLUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<Object,Text,NullWritable,Text> {

    Text text=new Text();
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        String lines=ETLUtils.getETCString(value.toString());

        if(StringUtils.isBlank(lines)) return;

        text.set(lines);

        context.write(NullWritable.get(),text);

    }
}

 

 

ETLRunner

 

package runner;

import java.io.IOException;

import mapper.ETLMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ETLRunner implements Tool {

    private Configuration conf=null;


    @Override
    public void setConf(Configuration conf) {
        this.conf=conf;
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {

        conf=this.getConf();
        //傳輸路徑變數
        conf.set("inpath",args[0]);
        conf.set("outpath",args[1]);

        Job job=Job.getInstance(conf,"youtub_etl_video");
        job.setJarByClass(ETLRunner.class);

        job.setMapperClass(ETLMapper.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(0);

        this.initInputPath(job);
        this.initOutputPath(job);


        return job.waitForCompletion(true)? 0:1;
    }

    private void initInputPath(Job job) throws IOException {

        Configuration conf=job.getConfiguration();
        String inpaths=conf.get("inpath");

        //獲取抽象檔案系統物件
        FileSystem fs=FileSystem.get(conf);
        //建立hdfs路徑實體物件
        Path inpath =new Path(inpaths);
        //判斷檔案系統存在該路徑
        if(fs.exists(inpath)){
           //設定輸入路徑
            FileInputFormat.addInputPath(job,inpath);
        }else{
            throw new RuntimeException("HDFS目錄不存在"+inpaths);
        }
    }

    private  void initOutputPath(Job job) throws IOException {

        Configuration conf=job.getConfiguration();

        String outpath=conf.get("outpath");

        FileSystem fs=FileSystem.get(conf);

        Path opath=new Path(outpath);

        if(fs.exists(opath)){
            //存在輸出路徑,刪除輸出路徑
            fs.delete(opath,true);
        }
            FileOutputFormat.setOutputPath(job,opath);

    }

    public static void main(String[] args) {

        try {
            int result=ToolRunner.run(new ETLRunner(),args);
            if(result==0){
                System.out.println("Success!");
            }else{
                System.out.println("Fail!");
            }
            System.exit(result);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

}

 

  1. 建立四張表打算用orc的表進行操作(操作效率比較高),但是orc型別的表只能通過insert的形式插入資料,所以需要建立兩張ori的表再進行插入操作。

create table youtube_ori(

 videoId string,

 uploader string,

 age int,

 category array<string>,

 length int,

 views int,

 rate float,

 ratings int,

 comments int,

 relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as textfile;

 

建立user ori表

create table youtube_user_ori(

 uploader string,

 videos int,

 friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as textfile;

 

 

//建立orc表

create table youtube_orc(

 videoId string,

 uploader string,

 age int,

 category array<string>,

 length int,

 views int,

 rate float,

 ratings int,

 comments int,

 relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as orc;

 

create table youtube_user_orc(

 uploader string,

 videos int,

 friends int)

clustered by (uploader) into 24 buckets

row format delimited

fields terminated by "\t"

stored as orc;

 

 

 

 

  1. 匯入資料到表ori中

 

load data inpath '/output/part-m-00000' into table youtube_ori;

 

load data inpath '/output/user.txt' into table youtube_user_ori;

  1. 將表匯入到orc中

用insert into table A select * from B倒入到orc表

 

  •  資料分析

欄位:

視訊id  上傳者   視訊年齡 視訊類別 觀看長度 觀看次數 視訊評分 流量 評論數 相關視訊

videoId,uploader,age,category,length,views,rate,ratings,comments,relatedId

 

  1. 統計視訊觀看數top10

create table viewstop10 as select videoId,uploader,age,category,length,views,rate,ratings,comments,relatedId from youtube_orc order by views desc limit 10;

 

  1.  統計類別熱度top10

create table hotTop10 as select t1.category_name as category,count(t1.videoId) as hot from(

select videoId,category_name from youtube_orc lateral view explode(category) t_catetory as category_name ) t1

group by t1.category_name order by hot desc limit 10;

 

  1.  統計出視訊觀看數最高的 20 個視訊的所屬類別以及類別包含這 Top20 視訊的個數

create table top20views_countCategory as

select category_name,count(videoId) as vcount from

(select videoId,category from

( select * from youtube_orc order by views desc limit 20) t1) t2 lateral view explode(category) t_catetory as category_name

group by category_name order by vcount desc;

 

  1. 統計視訊觀看數 Top50 所關聯視訊的所屬類別的熱度排名

select category_name,count(views) as mcount from

(select videoId,category_name,views from

(select videoId,category,views from youtube_orc order by views desc limit 50) t1

lateral view explode(category) t_category as category_name

) t2 group by category_name order by mcount desc;

 

  1. 統計每個類別中的視訊熱度 Top10,以 Music 為例

create table youtube_category(

 videoId string,

 uploader string,

 age int,

 categoryId string,

 length int,

 views int,

 rate float,

 ratings int,

 comments int,

 relatedId array<string>)

row format delimited

fields terminated by "\t"

collection items terminated by "&"

stored as orc;

 

insert into table youtube_category

 select

 videoId,

 uploader,

 age,

 categoryId,

 length,

 views,

 rate,

 ratings,

 comments,

 relatedId

 from

 youtube_orc lateral view explode(category) catetory as categoryId

 

create table musicTop10 as

 select videoId,categoryId,views from youtube_category where categoryId="Music" order by views desc limit 10;

 

  1. 統計每個類別中視訊流量 Top10,以 Music 為例

create table ratingsTop10 as

select videoId,views,ratings from youtube_category where categoryId="Music" order by ratings desc limit 10;

 

  1. 統計上傳視訊最多的使用者 Top10 以及他們上傳的觀看次數在前 20 的視訊

select t2.videoId,t2.uploader,t2.views,t1.videos from

(select * from youtube_user_orc order by videos desc limit 10) t1 join youtube_orc t2 on t1.uploader=t2.uploader order by t2.views

limit 20;

 

  1. 統計每個類別視訊觀看數 Top10

create table categoryId_views as

select * from

(select videoId,categoryId,views,row_number() over(partition by categoryId order by views desc) rant from youtube_category) t1

 where rant<=10;

  •