Storm模擬將接收到日誌的會話id列印在控制檯
需求:
(1)模擬訪問網站的日誌資訊,包括:網站名稱、會話id、訪問網站時間等
(2)將接收到日誌的會話id列印到控制檯
分析
(1)建立網站訪問日誌工具類
(2)在spout中讀取日誌檔案,並一行一行發射出去
(3)在bolt中將獲取到的一行一行資料的會話id獲取到,並列印到控制檯。
(4)main方法負責拼接spout和bolt的拓撲。
案例實操
(1)建立網站訪問日誌
GenerateData生產資料:
package storm;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
public class GenerateData {
public static void main (String[] args){
File logFile = new File("F:\\test\\websit.log");
Random random = new Random();
//1 網站名稱
String[] hosts = {"www.zyd.com"};
//2 會話id
String[ ] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123",
"XXYH6YCGFJYERTT834R52FDXV9U34",
"BBYH61456FGHHJ7JL89RG5VV9UYU7",
"CYYH6Y2345GHI899OFG4V9U567",
"VVVYH6Y4V4SFXZ56JIPDPB4V678" };
//3 訪問網站時間
String[] time = { "2017-08-07 08:40:50",
"2017-08-07 08:40:51",
"2017-08-07 08:40:52",
"2017-08-07 08:40:53",
"2017-08-07 09:40:49",
"2017-08-07 10:40:49",
"2017-08-07 11:40:49",
"2017-08-07 12:40:49" };
//4 拼接網站訪問日誌
StringBuffer sbBuffer = new StringBuffer();
for (int i = 0; i < 40; i++) {
sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]
+ "\t" + time[random.nextInt(8)] + "\n"
);
}
//5 寫資料到檔案中
FileOutputStream outputStream = null;
try {
outputStream = new FileOutputStream(logFile);
outputStream.write(sbBuffer.toString().getBytes());
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
生產資料集
www.zyd.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-08-07 08:40:52
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 11:40:49
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 08:40:53
www.zyd.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-08-07 10:40:49
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 08:40:52
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 12:40:49
WebLogSpout: 接收一行一行的檔案
package storm.weblog;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.*;
import java.util.Map;
public class WebLogSpout implements IRichSpout{
private static final long serialVersionUID = 1L;
private BufferedReader br;
private SpoutOutputCollector collector = null;
private String str = null;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//開啟輸入的檔案
try {
this.collector = collector;
this.br = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
//迴圈呼叫的方法
try {
while ((str = this.br.readLine())!= null){
//發射出去
collector.emit(new Values(str));
//Thread.sleep(3000);
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//宣告輸出欄位型別
outputFieldsDeclarer.declare(new Fields("log"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WebLogBolt
package storm.weblog;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
public class WebLogBolt implements IRichBolt {
private int line_number = 0;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
//準備
}
@Override
public void execute(Tuple input) {
//執行
//1 獲取資料
String log = input.getStringByField("log");//和spout獲取資料資訊名稱要對應
String line = input.getString(0);
//2 切割資料
String[] split = line.split("\t");
String session_id = split[1];
//3 統計傳送行數
line_number++;
//4 列印 執行緒id,方便後期測試
System.out.println(Thread.currentThread().getId()+ "session_id"+" "+session_id+"line_number"+line_number);
}
@Override
public void cleanup() {
//清除資源
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//宣告
}
@Override
public Map<String, Object> getComponentConfiguration() {
//獲取配置資訊
return null;
}
}
WebLogMain組合Spout和Bolt
package storm.weblog;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class WebLogMain {
public static void main (String[] args){
// 1 建立拓撲
TopologyBuilder builder = new TopologyBuilder();
//2 名稱 ,物件 ,設定並行度
builder.setSpout("WebLogSpout",new WebLogSpout(),1);
// 7種分組方式,ShuffleGrouping分組方式比較常見,隨機分配
builder.setBolt("WebLogBolt",new WebLogBolt(),1).shuffleGrouping("WebLogSpout");
//3 建立配置資訊物件
Config conf = new Config();
conf.setNumWorkers(2);
//4 提交程式
if (args.length > 0){ //叢集提交
try {
StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else {// 本地提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("webtopology",conf,builder.createTopology());
}
}
}
輸出舉例
158session_id XXYH6YCGFJYERTT834R52FDXV9U34line_number34
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number35
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number36
對原weblog.txt檔案,增加資訊和插入資訊,資訊格式不變.控制檯可以實時監測檔案增加資訊,並把資訊讀取到再處理