1. 程式人生 > 實用技巧 >二.Flink實時專案電商使用者行為之實時流量統計

二.Flink實時專案電商使用者行為之實時流量統計

1.1 模組建立和資料準備

在Flink-project下新建一個 maven module作為子專案,命名為gmall-network-flow。在這個子模組中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom檔案。

src/main/目錄下,將apache伺服器的日誌檔案apache.log複製到資原始檔目錄input下,我們將從這裡讀取資料。

當然,我們也可以仍然用UserBehavior.csv作為資料來源,這時我們分析的就不是每一次對伺服器的訪問請求了,而是具體的頁面瀏覽(“pv”)操作。

1.2 基於伺服器log的熱門頁面瀏覽量統計

我們現在要實現的模組是 “實時流量統計”。對於一個電商平臺而言,使用者登入的入口流量、不同頁面的訪問流量都是值得分析的重要資料,而這些資料,可以簡單地從web

伺服器的日誌中提取出來。

我們在這裡先實現“熱門頁面瀏覽數”的統計,也就是讀取伺服器日誌中的每一行log,統計在一段時間內使用者訪問每一個url的次數,然後排序輸出顯示。

具體做法為:每隔5秒,輸出最近10分鐘內訪問量最多的前NURL可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑑此前的程式碼。

src/main/app下建立HotUrlApp類。定義javaBean ApacheLog,這是輸入的日誌資料流;另外還有UrlViewCount,這是視窗操作統計的輸出資料型別。在main函式中建立StreamExecutionEnvironment 並做配置,然後從apache.log

檔案中讀取資料,幷包裝成ApacheLog型別。

需要注意的是,原始日誌中的時間是dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:

.map(line -> {
String[] split = line.split(",");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(split[3]).getTime();
return new ApacheLog(split[0], split[1], time, split[5], split[6]);
})

完整程式碼如下:

public class HotUrlApp {

    public static void main(String[] args) throws Exception {

//1.建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//2.讀取文字檔案建立流,轉換為JavaBean並提取時間戳
// SingleOutputStreamOperator<ApacheLog> apachLogDS = env.readTextFile("input/apache.log")
SingleOutputStreamOperator<ApacheLog> apachLogDS = env.socketTextStream("hadoop102", 7777)
.map(line -> {
String[] fields = line.split(" ");
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
long time = sdf.parse(fields[3]).getTime();
return new ApacheLog(fields[0], fields[1], time, fields[5], fields[6]);
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ApacheLog>(Time.seconds(1)) {
@Override
public long extractTimestamp(ApacheLog element) {
return element.getEventTime();
}
});

OutputTag<ApacheLog> outputTag = new OutputTag<ApacheLog>("sideOutPut") {
};

//3.過濾資料,按照url分組,開窗,累加計算
SingleOutputStreamOperator<UrlViewCount> aggregate = apachLogDS
.filter(data -> "GET".equals(data.getMethod()))
.keyBy(data -> data.getUrl())
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.sideOutputLateData(outputTag)
.aggregate(new UrlCountAggFunc(), new UrlCountWindowFunc());

//4.按照視窗結束時間重新分組,計算組內排序
SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
.process(new UrlCountProcessFunc(5));

//5.列印資料
apachLogDS.print("apachLogDS");
aggregate.print("aggregate");
result.print("result");
aggregate.getSideOutput(outputTag).print("side");

//6.執行
env.execute();

}
public static class UrlCountAggFunc implements AggregateFunction<ApacheLog, Long, Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(ApacheLog value, Long accumulator) {
return accumulator + 1L;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a + b;
}
}

public static class UrlCountWindowFunc implements WindowFunction<Long, UrlViewCount, String, TimeWindow> {

@Override
public void apply(String url, TimeWindow window, Iterable<Long> input, Collector<UrlViewCount> out) throws Exception {
out.collect(new UrlViewCount(url, window.getEnd(), input.iterator().next()));
}
}

public static class UrlCountProcessFunc extends KeyedProcessFunction<Long, UrlViewCount, String> {

//定義TopSize屬性
private Integer topSize;

public UrlCountProcessFunc() {
}

public UrlCountProcessFunc(Integer topSize) {
this.topSize = topSize;
}

//定義集合狀態用於存放同一個視窗中的資料
private MapState<String,UrlViewCount> mapState; //不能用ListState,因為它會把相同url的資料都會保持而我們只需要後面那個狀態的,例如<url,1>,<url,2>但我們只要最新來的那個更新後的資料

@Override
public void open(Configuration parameters) throws Exception {
mapState=getRuntimeContext().getMapState(new MapStateDescriptor<String, UrlViewCount>("map-state",String.class,UrlViewCount.class));
}

@Override
public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {

//將資料放置集合狀態
mapState.put(value.getUrl(),value);
//註冊定時器,用於處理狀態中的資料
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
//註冊定時器,用於觸發清空狀態的
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 60000L);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

if (timestamp == ctx.getCurrentKey() + 60000L) {
//清空狀態
mapState.clear(); 超過watermark才清空
return;
}

//1.取出狀態中的資料
Iterator<Map.Entry<String, UrlViewCount>> iterator = mapState.entries().iterator();
List<Map.Entry<String, UrlViewCount>> entries = Lists.newArrayList(iterator);

//2.排序
entries.sort(new Comparator<Map.Entry<String, UrlViewCount>>() {
@Override
public int compare(Map.Entry<String, UrlViewCount> o1, Map.Entry<String, UrlViewCount> o2) {
if (o1.getValue().getCount() > o2.getValue().getCount()) {
return -1;
} else if (o1.getValue().getCount() < o2.getValue().getCount()) {
return 1;
} else {
return 0;
}
}
});

StringBuilder sb = new StringBuilder();
sb.append("======================\n");
sb.append("當前視窗結束時間為:").append(new Timestamp(timestamp - 1L)).append("\n");

//取前topSize條資料輸出
for (int i = 0; i < Math.min(topSize, entries.size()); i++) {
//取出資料
Map.Entry<String, UrlViewCount> entry = entries.get(i);
sb.append("TOP ").append(i + 1);
sb.append(" URL=").append(entry.getValue().getUrl());
sb.append(" 頁面熱度=").append(entry.getValue().getCount());
sb.append("\n");
}
sb.append("======================\n\n");
//清空狀態
// listState.clear(); //不在這裡刪除的原因是每來一條不同的資料它會把上一條資料給清空,我們需要保持上一條資料的狀態
Thread.sleep(1000);
//輸出資料
out.collect(sb.toString());
}
}
}

1.3.1基於埋點日誌資料的網路流量統計

衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page ViewPV)。使用者每次開啟一個頁面便記錄1PV,多次開啟同一頁面則瀏覽量累計。一般來說,PV與來訪者的數量成正比,但是PV並不直接決定頁面的真實來訪者數量,如同一個來訪者通過不斷的重新整理頁面,也可以製造出非常高的PV

我們知道,使用者瀏覽頁面時,會從瀏覽器向網路伺服器發出一個請求(Request),網路伺服器接到這個請求後,會將該請求對應的一個網頁(Page)傳送給瀏覽器,從而產生了一個PV。所以我們的統計方法,可以是從web伺服器的日誌中去提取對應的頁面訪問然後統計,就向上一節中的做法一樣;也可以直接從埋點日誌中提取使用者發來的頁面請求,從而統計出總瀏覽量。

所以,接下來我們用UserBehavior.csv作為資料來源,實現一個網站總瀏覽量的統計。我們可以設定滾動時間視窗,實時統計每小時內的網站PV

完整程式碼如下:

1JavaBean--PVCount

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PvCount {
private String url;
private Long windowEnd;
private Long count;
}

2)主程式

public class PageViewApp {
public static void main(String[] args) throws Exception {

//1.建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//2.從檔案讀取資料建立流並轉換為JavaBean同時提取事件時間
SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
.map(line -> {
String[] fileds = line.split(",");
return new UserBehavior(Long.parseLong(fileds[0]),
Long.parseLong(fileds[1]),
Integer.parseInt(fileds[2]),
fileds[3],
Long.parseLong(fileds[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});

//3.按照"pv"過濾,按照itemID分組,開窗,計算資料
SingleOutputStreamOperator<PvCount> aggregate = userDS.filter(data -> "pv".equals(data.getBehavior()))
.map(new MapFunction<UserBehavior, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
Random random = new Random();
return new Tuple2<>("pv_" + random.nextInt(8), 1);
}
})
.keyBy(0)
.timeWindow(Time.hours(1))
.aggregate(new PvCountAggFunc(), new PvCountWindowFunc());
//Aggregate是將同一個窗口裡的資料聚合(sum(將所有窗口裡的資料聚合))
//Aggregate的作用 是可以給視窗放一個結束時間,拿到結束時間做重新分組然後再聚合
//按照視窗結束時間重新分組(按定時器加1毫秒作為結束時間)
SingleOutputStreamOperator<String> result = aggregate.keyBy(data -> data.getWindowEnd())
.process(new PvCountProcessFunc());

//4.列印輸出
result.print();

//5.執行
env.execute();

}
//做累加計算
public static class PvCountAggFunc implements AggregateFunction<Tuple2<String, Integer>,Long,Long>{

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Tuple2<String, Integer> value, Long accumulator) {
return accumulator + 1L;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a + b ;
}
}
//提取視窗時間
public static class PvCountWindowFunc implements WindowFunction<Long, PvCount, Tuple, TimeWindow>{

@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<PvCount> out) throws Exception {
String field = tuple.getField(0);
out.collect(new PvCount(field,window.getEnd(),input.iterator().next()));
}
}
public static class PvCountProcessFunc extends KeyedProcessFunction<Long,PvCount,String>{
private ListState<PvCount> listState;

@Override
public void open(Configuration parameters) throws Exception {
listState=getRuntimeContext().getListState(new ListStateDescriptor<PvCount>("list-state",PvCount.class));
}

@Override
public void processElement(PvCount value, Context ctx, Collector<String> out) throws Exception {
listState.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//取出狀態資訊
Iterator<PvCount> iterator = listState.get().iterator();

//定義最終一個小時的資料總和
Long count=0L;
//遍歷集合資料累加結果
while (iterator.hasNext()){
count += iterator.next().getCount();
}
//輸出結果資料
out.collect("pv:"+count);
//清空狀態
listState.clear();
}
}
}
1.3.2 網站獨立訪客數UV)的統計

在上節的例子中,我們統計的是所有使用者對頁面的所有瀏覽行為,也就是說,同一使用者的瀏覽行為會被重複統計。而在實際應用中,我們往往還會關注,在一段時間內到底有多少不同的使用者訪問了網站。

另外一個統計流量的重要指標是網站的獨立訪客數(Unique VisitorUV)。UV指的是一段時間(比如一小時)內訪問網站的總人數,1天內同一訪客的多次訪問只記錄為一個訪客。通過IPcookie一般是判斷UV值的兩種方式。當客戶端第一次訪問某個網站伺服器的時候,網站伺服器會給這個客戶端的電腦發出一個Cookie,通常放在這個客戶端電腦的C盤當中。在這個Cookie中會分配一個獨一無二的編號,這其中會記錄一些訪問伺服器的資訊,如訪問時間,訪問了哪些頁面等等。當你下次再訪問這個伺服器的時候,伺服器就可以直接從你的電腦中找到上一次放進去的Cookie檔案,並且對其進行一些更新,但那個獨一無二的編號是不會變的。

當然,對於UserBehavior資料來源來說,我們直接可以根據userId來區分不同的使用者。

完整程式碼如下:

1JavaBean--UvCount

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UvCount {
private Long windowEnd;
private Long count;
}

2)主程式--UvCountApp

/**
* uv是獨立訪客數 需要去重
* 去重方式:
* 不用filter
* 可以用本地集合直接開窗(timewindowall有觸發器 ,windowall是沒有觸發器(要自己寫觸發器)) )後面用apply 生成環境不可取(不用分組)
* 效率太低,所有資料放在一個集合一起處理,記憶體消耗高 生成環境不可取
* 全量視窗關閉一起處理
* 可以用外部框架redis
*
*/
public class UvCountApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(Long.parseLong(fields[0]),
Long.parseLong(fields[1]),
Integer.parseInt(fields[2]),
fields[3],
Long.parseLong(fields[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
//3.開窗一個小時
SingleOutputStreamOperator<UvCount> result = userDS.timeWindowAll(Time.hours(1))
.apply(new UvCountAllWindowFunc());
//4.列印
result.print();
//5.啟動任務
env.execute();
}

//自定義實現AllWindowFunction
public static class UvCountAllWindowFunc implements AllWindowFunction<UserBehavior, UvCount, TimeWindow> {

@Override
public void apply(TimeWindow window, Iterable<UserBehavior> values, Collector<UvCount> out) throws Exception {
//建立hashSet用於存放userID
HashSet<Long> userIds = new HashSet<Long>();
//遍歷values
Iterator<UserBehavior> iterator = values.iterator();
while (iterator.hasNext()) {
userIds.add(iterator.next().getUserId());
}
//輸出資料
String uv = "uv";
String windowEnd = new Timestamp(window.getEnd()).toString();
Long count = (long) userIds.size();
out.collect(new UvCount(uv, windowEnd, count));
}
}
}

1.3.3 使用布隆過濾器UV統計

在上節的例子中,我們把所有資料的userId都存在了視窗計算的狀態裡,在視窗收集資料的過程中,狀態會不斷增大。一般情況下,只要不超出記憶體的承受範圍,這種做法也沒什麼問題;但如果我們遇到的資料量很大呢?

把所有資料暫存放到記憶體裡,顯然不是一個好注意。我們會想到,可以利用redis這種記憶體級k-v資料庫,為我們做一個快取。但如果我們遇到的情況非常極端,資料大到驚人呢?比如上億級的使用者,要去重計算UV

如果放到redis中,億級的使用者id(每個20位元組左右的話)可能需要幾G甚至幾十G的空間來儲存。當然放到redis中,用叢集進行擴充套件也不是不可以,但明顯代價太大了。

一個更好的想法是,其實我們不需要完整地儲存使用者ID的資訊,只要知道他在不在就行了。所以其實我們可以進行壓縮處理,用一位(bit)就可以表示一個使用者的狀態。這個思想的具體實現就是布隆過濾器(Bloom Filter)。

本質上布隆過濾器是一種資料結構比較巧妙的概率型資料結構probabilistic data structure),特點是高效地插入和查詢,可以用來告訴你 “某樣東西一定不存在或者可能存在”。

它本身是一個很長的二進位制向量,既然是二進位制的向量,那麼顯而易見的,存放的不是0,就是1。相比於傳統的 ListSetMap 等資料結構,它更高效、佔用空間更少,但是缺點是其返回的結果是概率性的,而不是確切的。

我們的目標就是,利用某種方法(一般是Hash函式)把每個資料,對應到一個位圖的某一位上去;如果資料存在,那一位就是1,不存在則為0

接下來我們就來具體實現一下。

注意這裡我們用到了redis連線存取資料,所以需要加入redis客戶端的依賴:

<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>

完整程式碼如下:

public class UvCountWithBloomFilterApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<UserBehavior> userDS = env.readTextFile("input/UserBehavior.csv")
.map(line -> {
String[] fields = line.split(",");
return new UserBehavior(Long.parseLong(fields[0]),
Long.parseLong(fields[1]),
Integer.parseInt(fields[2]),
fields[3],
Long.parseLong(fields[4]));
})
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.getTimestamp() * 1000L;
}
});
//3.開窗一個小時
SingleOutputStreamOperator<UvCount> result = userDS.timeWindowAll(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvWithBloomFilterWindowFunc());
result.print();
env.execute();

}
//自定義觸發器,每來一條資料,觸發一次計算並輸出結果
public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {

@Override
public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;//做計算和輸出
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

}
}
//考慮redis
//.儲存資料本身 900-1000 count
//1.如果用hash存考慮外部key和內部key
//我們使用hash的時候,外層key可以在外面拿,而記憶體key(視窗的資訊)要通過process方法裡的
// Context上下文來拿
//外部key(隨便給) UvCount 內部key 900-1000 value count
//過期時間只能刪除外部key不能刪除內部key(資料一旦超時會將根據外部key將資料全部刪除而我們希望可以根據內部key來刪除)
//UvCount900-1000count
//2.如果用String(資料量極大的時候,我們考慮用String,如果定義超時時間,資料要刪的化,也考慮String)
//key(我們通過拼接的方式) UvCount-900-1000 value count
//.BitMap 我們要操作SETBIT,GETBIT,我們只能用String來存
//bitMap-900-1000
public static class UvWithBloomFilterWindowFunc extends ProcessAllWindowFunction<UserBehavior, UvCount,TimeWindow>{
//定義Redis連線
Jedis jedis;
//定義一個布隆過濾器
MyBloomFilter myBloomFilter;

//定義UvCountRedisKey Hash
String uvCountRedisKey;

@Override
public void open(Configuration parameters) throws Exception {
jedis = new Jedis("hadoop102", 6379);
myBloomFilter=new MyBloomFilter(1L << 29);//64M 1億資料*(3-10)這裡我們取5
//5 5*10^8 =(2^10)^3/2=2^29 5億 是2^29bit=1024*1024*2^3*2^6=64M
uvCountRedisKey="UvCount";
}

@Override
public void process(Context context, Iterable<UserBehavior> elements, Collector<UvCount> out) throws Exception {
//1.獲取視窗資訊並指定UvCountRedisKeyfield,同時指定BitMapRedisKey
String windowEnd = new Timestamp(context.window().getEnd()).toString();
String bitMapRedisKey="UvBitMap:"+windowEnd;
//2.判斷當前的userID是否已經存在
Long offset = myBloomFilter.hash(elements.iterator().next().getUserId() + "");
Boolean exist = jedis.getbit(bitMapRedisKey, offset);
//3.如果不存在,redis中累加資料,並將BitMap中對應的位置改為true
if(! exist){
jedis.hincrBy(uvCountRedisKey,windowEnd,1L);
jedis.setbit(bitMapRedisKey,offset,true);
}
//4.取出Redis中對應的Count,傳送
long count=Long.parseLong(jedis.hget(uvCountRedisKey,windowEnd));
//5.傳送資料
out.collect(new UvCount("uv",windowEnd,count));

}

@Override
public void close() throws Exception {
jedis.close();
}
}
//自定義一個布隆過濾器,定義一個位圖的大小和hash函式
public static class MyBloomFilter{
//定義布隆過濾器的總容量,bit的個數,必須是2的整次冪
private Long cap;

public MyBloomFilter() {
}

public MyBloomFilter(Long cap) {
this.cap = cap;
}
    //hash函式
public Long hash(String value){
int result=0;
for(char c:value.toCharArray()){
result=result * 31 + c;//乘以質數31是為了abc ,bca 雖然元素相同,但位置不同,結果不同
}
//位與運算 // 返回hash值,不能超過cap
return result & (cap - 1);//由於是2的整次冪,cap-1 由原來的10000000 變成 01111111
//result大於cap-1 假如result11100001
//01111111 上下做位與操作 去除了result 大於cap-1的可能
//11100001 高位取的是cap -1 低位取的是 result
// 01100001
}
}
}