1. 程式人生 > >Storm之網站實時統計

Storm之網站實時統計

package com.uplooking.bigdata.storm.test;

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * 實時統計某個網站PV和UV
 * PV
 *  page view
 *  網站的點選率
 * UV
 *  獨立訪客的數量
 *
 *  作業:
 *  1、求出訪問量最高的3個IP,其對應的訪問次數
 *  2、在此基礎之上要求大家統計每一個時段[間隔1個小時]內的pv和uv值
 *
 */
public class PVAndUVSumTopology {
    static class PUSpout extends BaseRichSpout {
        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        //監聽一個目錄新檔案的產生
        public void nextTuple() {
            Collection<File> files = FileUtils.listFiles(new File("E:/test/storm"), new String[]{"log"}, true);
            try {
                for (File file : files) {
                    List<String> lines = FileUtils.readLines(file, "UTF-8");
                    for (String line : lines) {
                        collector.emit(new Values(line));
                    }
                    //處理完畢當前檔案之後,對該檔案設定標識
                    FileUtils.moveFile(file, new File(file.getAbsolutePath() + "." + System.currentTimeMillis()));
                }
            } catch (IOException e) {
                //啥事不幹
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }

    static class SplitBolt extends BaseRichBolt {
        private Map conf;
        private TopologyContext context;
        private OutputCollector collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String line = tuple.getStringByField("line");
            String[] splits = line.split("##");
            if(splits == null || splits.length < 1) {
                return;
            }
            //因為我們要進行的統計之和IP有關係,所以沒有必要將其所有的資料都發送到下游
            //以免造成網路擁堵
            collector.emit(new Values(splits[0]));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("ip"));
        }
    }

    //統計PV和UV值
    static class PUBolt extends BaseRichBolt {
        private Map conf;
        private TopologyContext context;
        private OutputCollector collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.conf = conf;
            this.context = context;
            this.collector = collector;
        }

        int pv = 0;
        Set<String> uvSet = new HashSet<String>();
        @Override
        public void execute(Tuple tuple) {
            if(!tuple.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) {
                ++pv;//計算截止到目前為止的PV值
                String ip = tuple.getStringByField("ip");
                uvSet.add(ip);
            } else {
                System.out.println("==============統計結果start===============");
                System.out.println("PV值:" + pv);
                System.out.println("UV值:" + uvSet.size());
                System.out.println("==============統計結果end============");
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            Map<String, Object> conf = new HashMap<>();
            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
            return conf;
        }
    }

    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("puSpout", new PUSpout());
        builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("puSpout");
        builder.setBolt("puBolt", new PUBolt()).shuffleGrouping("splitBolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = PVAndUVSumTopology.class.getSimpleName();
        Config conf = new Config();

        cluster.submitTopology(topologyName, conf, stormTopology);
    }
}