1. 程式人生 > >Storm模擬將接收到日誌的會話id列印在控制檯

Storm模擬將接收到日誌的會話id列印在控制檯

需求:

(1)模擬訪問網站的日誌資訊,包括:網站名稱、會話id、訪問網站時間等

(2)將接收到日誌的會話id列印到控制檯

分析

​ (1)建立網站訪問日誌工具類

​ (2)在spout中讀取日誌檔案,並一行一行發射出去

​ (3)在bolt中將獲取到的一行一行資料的會話id獲取到,並列印到控制檯。

​ (4)main方法負責拼接spout和bolt的拓撲。

案例實操

(1)建立網站訪問日誌

GenerateData生產資料:
package storm;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.util.Random; public class GenerateData { public static void main (String[] args){ File logFile = new File("F:\\test\\websit.log"); Random random = new Random(); //1 網站名稱 String[] hosts = {"www.zyd.com"}; //2 會話id String[
] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; //3 訪問網站時間 String[] time = { "2017-08-07 08:40:50",
"2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53", "2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" }; //4 拼接網站訪問日誌 StringBuffer sbBuffer = new StringBuffer(); for (int i = 0; i < 40; i++) { sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)] + "\n" ); } //5 寫資料到檔案中 FileOutputStream outputStream = null; try { outputStream = new FileOutputStream(logFile); outputStream.write(sbBuffer.toString().getBytes()); } catch (Exception e) { e.printStackTrace(); }finally { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }

生產資料集

www.zyd.com ABYH6Y4V4SCVXTG6DPB4VH9U123 2017-08-07 08:40:52
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 11:40:49
www.zyd.com XXYH6YCGFJYERTT834R52FDXV9U34 2017-08-07 08:40:53
www.zyd.com VVVYH6Y4V4SFXZ56JIPDPB4V678 2017-08-07 10:40:49
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 08:40:52
www.zyd.com CYYH6Y2345GHI899OFG4V9U567 2017-08-07 12:40:49

WebLogSpout: 接收一行一行的檔案
package storm.weblog;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.*;
import java.util.Map;

public class WebLogSpout implements IRichSpout{
    private static final long serialVersionUID = 1L;
    private BufferedReader br;
    private SpoutOutputCollector collector = null;
    private String str = null;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {

        //開啟輸入的檔案
        try {
            this.collector = collector;
            this.br = new BufferedReader(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public void nextTuple() {

        //迴圈呼叫的方法
        try {
            while ((str = this.br.readLine())!= null){
                //發射出去
                collector.emit(new Values(str));
                //Thread.sleep(3000);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        //宣告輸出欄位型別
        outputFieldsDeclarer.declare(new Fields("log"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

WebLogBolt

package storm.weblog;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

public class WebLogBolt implements IRichBolt {
    private int line_number = 0;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        //準備
    }

    @Override
    public void execute(Tuple input) {
        //執行
        //1 獲取資料

        String log = input.getStringByField("log");//和spout獲取資料資訊名稱要對應

        String line = input.getString(0);
        //2 切割資料
        String[] split = line.split("\t");

        String session_id = split[1];
        //3 統計傳送行數
        line_number++;

        //4 列印 執行緒id,方便後期測試
        System.out.println(Thread.currentThread().getId()+ "session_id"+"   "+session_id+"line_number"+line_number);

    }

    @Override
    public void cleanup() {
        //清除資源

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //宣告
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        //獲取配置資訊
        return null;
    }
}

WebLogMain組合Spout和Bolt

package storm.weblog;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

public class WebLogMain {
    public static void main (String[] args){
        // 1 建立拓撲
        TopologyBuilder builder = new TopologyBuilder();
        //2 名稱 ,物件 ,設定並行度
        builder.setSpout("WebLogSpout",new WebLogSpout(),1);
        //  7種分組方式,ShuffleGrouping分組方式比較常見,隨機分配
        builder.setBolt("WebLogBolt",new WebLogBolt(),1).shuffleGrouping("WebLogSpout");
        //3 建立配置資訊物件
        Config conf  = new Config();
        conf.setNumWorkers(2);

        //4 提交程式
        if (args.length > 0){ //叢集提交
            try {
                StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else {// 本地提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("webtopology",conf,builder.createTopology());

        }

    }
}

輸出舉例

158session_id XXYH6YCGFJYERTT834R52FDXV9U34line_number34
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number35
158session_id BBYH61456FGHHJ7JL89RG5VV9UYU7line_number36

對原weblog.txt檔案,增加資訊和插入資訊,資訊格式不變.控制檯可以實時監測檔案增加資訊,並把資訊讀取到再處理