Storm之網站實時統計
阿新 • • 發佈:2019-02-10
package com.uplooking.bigdata.storm.test;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.util.*;
/**
* 實時統計某個網站PV和UV
* PV
* page view
* 網站的點選率
* UV
* 獨立訪客的數量
*
* 作業:
* 1、求出訪問量最高的3個IP,其對應的訪問次數
* 2、在此基礎之上要求大家統計每一個時段[間隔1個小時]內的pv和uv值
*
*/
public class PVAndUVSumTopology {
static class PUSpout extends BaseRichSpout {
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
//監聽一個目錄新檔案的產生
public void nextTuple() {
Collection<File> files = FileUtils.listFiles(new File("E:/test/storm"), new String[]{"log"}, true);
try {
for (File file : files) {
List<String> lines = FileUtils.readLines(file, "UTF-8");
for (String line : lines) {
collector.emit(new Values(line));
}
//處理完畢當前檔案之後,對該檔案設定標識
FileUtils.moveFile(file, new File(file.getAbsolutePath() + "." + System.currentTimeMillis()));
}
} catch (IOException e) {
//啥事不幹
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
static class SplitBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField("line");
String[] splits = line.split("##");
if(splits == null || splits.length < 1) {
return;
}
//因為我們要進行的統計之和IP有關係,所以沒有必要將其所有的資料都發送到下游
//以免造成網路擁堵
collector.emit(new Values(splits[0]));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip"));
}
}
//統計PV和UV值
static class PUBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
int pv = 0;
Set<String> uvSet = new HashSet<String>();
@Override
public void execute(Tuple tuple) {
if(!tuple.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) {
++pv;//計算截止到目前為止的PV值
String ip = tuple.getStringByField("ip");
uvSet.add(ip);
} else {
System.out.println("==============統計結果start===============");
System.out.println("PV值:" + pv);
System.out.println("UV值:" + uvSet.size());
System.out.println("==============統計結果end============");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("puSpout", new PUSpout());
builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("puSpout");
builder.setBolt("puBolt", new PUBolt()).shuffleGrouping("splitBolt");
StormTopology stormTopology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
String topologyName = PVAndUVSumTopology.class.getSimpleName();
Config conf = new Config();
cluster.submitTopology(topologyName, conf, stormTopology);
}
}
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.File;
import java.io.IOException;
import java.util.*;
/**
* 實時統計某個網站PV和UV
* PV
* page view
* 網站的點選率
* UV
* 獨立訪客的數量
*
* 作業:
* 1、求出訪問量最高的3個IP,其對應的訪問次數
* 2、在此基礎之上要求大家統計每一個時段[間隔1個小時]內的pv和uv值
*
*/
public class PVAndUVSumTopology {
static class PUSpout extends BaseRichSpout {
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
//監聽一個目錄新檔案的產生
public void nextTuple() {
Collection<File> files = FileUtils.listFiles(new File("E:/test/storm"), new String[]{"log"}, true);
try {
for (File file : files) {
List<String> lines = FileUtils.readLines(file, "UTF-8");
for (String line : lines) {
collector.emit(new Values(line));
}
//處理完畢當前檔案之後,對該檔案設定標識
FileUtils.moveFile(file, new File(file.getAbsolutePath() + "." + System.currentTimeMillis()));
}
} catch (IOException e) {
//啥事不幹
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
static class SplitBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField("line");
String[] splits = line.split("##");
if(splits == null || splits.length < 1) {
return;
}
//因為我們要進行的統計之和IP有關係,所以沒有必要將其所有的資料都發送到下游
//以免造成網路擁堵
collector.emit(new Values(splits[0]));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip"));
}
}
//統計PV和UV值
static class PUBolt extends BaseRichBolt {
private Map conf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
int pv = 0;
Set<String> uvSet = new HashSet<String>();
@Override
public void execute(Tuple tuple) {
if(!tuple.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) {
++pv;//計算截止到目前為止的PV值
String ip = tuple.getStringByField("ip");
uvSet.add(ip);
} else {
System.out.println("==============統計結果start===============");
System.out.println("PV值:" + pv);
System.out.println("UV值:" + uvSet.size());
System.out.println("==============統計結果end============");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("puSpout", new PUSpout());
builder.setBolt("splitBolt", new SplitBolt()).shuffleGrouping("puSpout");
builder.setBolt("puBolt", new PUBolt()).shuffleGrouping("splitBolt");
StormTopology stormTopology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
String topologyName = PVAndUVSumTopology.class.getSimpleName();
Config conf = new Config();
cluster.submitTopology(topologyName, conf, stormTopology);
}
}