二.Flink實時專案電商使用者行為之實時流量統計
1.1 模組建立和資料準備
在Flink-project下新建一個 maven module作為子專案,命名為gmall-network-flow。在這個子模組中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom檔案。
在src/main/目錄下,將apache伺服器的日誌檔案apache.log複製到資原始檔目錄input下,我們將從這裡讀取資料。
當然,我們也可以仍然用UserBehavior.csv作為資料來源,這時我們分析的就不是每一次對伺服器的訪問請求了,而是具體的頁面瀏覽(“pv”)操作。
1.2 基於伺服器log的熱門頁面瀏覽量統計
我們現在要實現的模組是 “實時流量統計”。對於一個電商平臺而言,使用者登入的入口流量、不同頁面的訪問流量都是值得分析的重要資料,而這些資料,可以簡單地從web
我們在這裡先實現“熱門頁面瀏覽數”的統計,也就是讀取伺服器日誌中的每一行log,統計在一段時間內使用者訪問每一個url的次數,然後排序輸出顯示。
具體做法為:每隔5秒,輸出最近10分鐘內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑑此前的程式碼。
在src/main/app下建立HotUrlApp類。定義javaBean ApacheLog,這是輸入的日誌資料流;另外還有UrlViewCount,這是視窗操作統計的輸出資料型別。在main函式中建立StreamExecutionEnvironment 並做配置,然後從apache.log
需要注意的是,原始日誌中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:
.map(line -> {
String[] split = line.split(",");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(split[3]).getTime();
return new ApacheLog(split[0], split[1], time, split[5], split[6]);
})
完整程式碼如下:
public class HotUrlApp {
public static void main(String[] args) throws Exception {
//1.建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.讀取文字檔案建立流,轉換為JavaBean並提取時間戳
// SingleOutputStreamOperator<ApacheLog> apachLogDS = env.readTextFile("input/apache.log")
SingleOutputStreamOperator<ApacheLog> apachLogDS = env.socketTextStream("hadoop102", 7777)
.map(line -> {
String[] fields = line.split(" ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(fields[3]).getTime();
return new ApacheLog(fields[0], fields[1], time, fields[5], fields[6]);
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLog>(Time.seconds(1)) {
@Override
public long extractTimestamp(ApacheLog element) {
return element.getEventTime();
}
});
OutputTag<ApacheLog> outputTag = new OutputTag<ApacheLog>("sideOutPut") {
};
//3.過濾資料,按照url分組,開窗,累加計算
SingleOutputStreamOperator<UrlViewCount> aggregate = apachLogDS
.filter(data -> "GET".equals(data.getMethod()))
.keyBy(data -> data.getUrl())
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.sideOutputLateData(outputTag)
.aggregate(new UrlCountAggFunc(), new UrlCountWindowFunc());
//4.按照視窗結束時間重新分組,計算組內排序
SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
.process(new UrlCountProcessFunc(5));
//5.列印資料
apachLogDS.print("apachLogDS");
aggregate.print("aggregate");
result.print("result");
aggregate.getSideOutput(outputTag).print("side");
//6.執行
env.execute();
}
public static class UrlCountAggFunc implements AggregateFunction<ApacheLog, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(ApacheLog value, Long accumulator) {
return accumulator + 1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
public static class UrlCountWindowFunc implements WindowFunction<Long, UrlViewCount, String, TimeWindow> {
@Override
public void apply(String url, TimeWindow window, Iterable<Long> input, Collector<UrlViewCount> out) throws Exception {
out.collect(new UrlViewCount(url, window.getEnd(), input.iterator().next()));
}
}
public static class UrlCountProcessFunc extends KeyedProcessFunction<Long, UrlViewCount, String> {
//定義TopSize屬性
private Integer topSize;
public UrlCountProcessFunc() {
}
public UrlCountProcessFunc(Integer topSize) {
this.topSize = topSize;
}
//定義集合狀態用於存放同一個視窗中的資料
private MapState<String,UrlViewCount> mapState; //不能用ListState,因為它會把相同url的資料都會保持而我們只需要後面那個狀態的,例如<url,1>,<url,2>但我們只要最新來的那個更新後的資料
@Override
public void open(Configuration parameters) throws Exception {
mapState=getRuntimeContext().getMapState(new MapStateDescriptor<String, UrlViewCount>("map-state",String.class,UrlViewCount.class));
}
@Override
public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {
//將資料放置集合狀態
mapState.put(value.getUrl(),value);
//註冊定時器,用於處理狀態中的資料
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
//註冊定時器,用於觸發清空狀態的