1. 程式人生 > >基於Flink的視頻直播案例(上)

基於Flink的視頻直播案例(上)

tst 不能 line acc exactly 日期 () 避免 class

目錄

  • 數據產生
  • Logstash部分
  • Kafka部分
  • Flink部分
    • 配置/準備代碼
    • 視頻核心指標監控

本案例參考自阿裏雲的視頻直播解決方案之視頻核心指標監控和視頻直播解決方案之直播數字化運營。

基於Kafka + Flink + ELK + Redis實現視頻直播數據的實時處理和可視化。

選型僅僅出於練習考慮,Logstash一般會換成flume或者直接用kafka。

模擬的總體流程:通過http請求發送json到Logstash,後者將數據轉發到Kafka,然後Flink拉取數據進行處理,結果寫入Elasticsearch,最後利用Kibana搭建實時dashboard。

實現功能:

  • 視頻核心指標監控:房間故障指標、分地域數據延遲情況、網站整體卡頓率、人均卡頓次數
  • 直播數字化運營:全站觀看直播總人數以及走勢、各房間觀看人數以及走勢、熱門直播房間及主播Top10,分類目主播Top10
  • 上述功能的dashboard展示

數據產生

數據結構

阿裏雲上有一些樣本數據,可以用於測試。另外由於數據量較少,並不方便後面的kibana可視化,所以我編寫了一個python程序模擬產生10W+條數據用於後續的可視化實現。下面只選取了與將要實現的功能相關的指標。

字段            含義
0.roomid       房間號
1.userid       用戶id
2.adrop        音頻丟幀數量
3.alat         音頻幀端到端延遲
4.vdrop        視頻丟幀數量
5.vlat         視頻幀端到端延遲
6.ublock       上行卡頓次數
7.dblock       下行卡頓次數
8.timestamp    打點時間戳
9.region       地域

我用python對這些csv數據進行了預處理,把原數據轉換為json格式,用來模擬一些生產情況.另外,在日期和時間之間的空格換成字母"T",不然下面用curl發送數據會報錯。一條數據如下所示:

{"roomid":"4","userid":"74262","adrop":"3","alat":"196","vdrop":"5","vlat":"209","ublock":"37","dblock":"39","region":"shenzhen","timestamp":"2018-12-08T00:00:00"}

通過下面腳本模擬發送http請求。

#!/bin/bash
FILE=$1
while read LINE; do
#   curl -XPOST -u live_data:live_data --header "Content-Type: application/json" "http://localhost:8080/" -d ''$LINE''
   sleep 0.03s # 如果實現涉及ontime,可以加上這條代碼。當然,生產環境下是不需要減慢的,詳細看後面flink實現總結。
   echo $LINE
done < $FILE

Logstash部分

安裝kafkaoutput插件bin/logstash-plugin install logstash-output-kafka

# conf文件
input {
  http {
    host => "localhost"
    port => 8080
    id => "live_data_http_input"
    user => "live_data"
    password => "live_data"
  }
}

filter{
  mutate {
    remove_field => ["@timestamp", "host", "headers", "@version","tags"]
  }
}

output {
  kafka { 
    topic_id => "core_metric" 
    codec => json
  }
}

# 測試
output {stdout {} }

Kafka部分

創建兩個topic分別代表後面需要處理的兩個部分。

# 啟動
zkServer.sh start
kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 創建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic core_metric

# 測試
kafka-console-consumer.sh --zookeeper localhost:2181 --topic core_metric

Flink部分

配置/準備代碼

下面是兩個業務都需要配置的一些準備代碼。

private static Logger logger = LoggerFactory.getLogger(CoreMetricMain.class);
private static FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss", Locale.ENGLISH);
private static FastDateFormat INDEX_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd", Locale.ENGLISH);

// Main
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 設置並行度,設置為1方便調試。
env.setParallelism(1);

// 設置使用eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// checkpoint配置
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 設置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

/**
 * 配置KafkaSource,這裏原本使用flink內置的json schema,但由於數據的日期有“-”而無法轉換
 * 所以就用了SimpleStringSchema,後面再利用fastjson進行轉換。當然,也可以自定義schema來省
 * 去一次轉化
 */
String topic = "core_metric";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092");
prop.setProperty("group.id", "con1");
prop.setProperty("auto.offset.reset", "latest");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new
        SimpleStringSchema(), prop);
DataStreamSource<String> stream = env.addSource(myConsumer);
stream.name("KafkaSource");

// 業務代碼...

env.execute("CoreMetric");

視頻核心指標監控

業務目標

針對客戶端APP的監控,獲取以下指標:

  • 房間故障,故障包括卡頓、丟幀、音視頻不同步等
  • 分地域統計數據端到端延遲平均情況
  • 統計實時整體卡頓率(出現卡頓的在線用戶數/在線總用戶數*100%,通過此指標可以衡量當前卡頓影響的人群範圍)
  • 統計人均卡頓次數(在線卡頓總次數/在線用戶數,通過此指標可以從卡頓頻次上衡量整體的卡頓嚴重程度)
// 對從 kafka 中獲取的 string 數據進行轉換。這裏我通過繼承 flink 的 Tuple10 來實現一個 MetricRecord 的 DTO 類,
// 在裏面增加getter方法,這樣在取值時就不需要通過idx來取了,例如 public Integer getRoomid() {return this.f0;}。
// 但後面的代碼還要對數據進行各種轉換,所以每次都要重新實現一個類也是挺繁瑣的事情,看自己的取舍吧。
MapFunction<String, MetricRecord> cleanMapFun = str -> {
    JSONObject jsonObject = JSONObject.parseObject(str);
    Date parse = TIME_FORMAT.parse(jsonObject.getString("timestamp"));
    long time = parse.getTime();
    return new MetricRecord(
            jsonObject.getInteger("roomid"), jsonObject.getLong("userid"),
            jsonObject.getInteger("adrop"), jsonObject.getInteger("alat"),
            jsonObject.getInteger("vdrop"), jsonObject.getInteger("vlat"),
            jsonObject.getInteger("ublock"), jsonObject.getInteger("dblock"),
            time, jsonObject.getString("region"));
};

SingleOutputStreamOperator<MetricRecord> cleanStream = stream.map(cleanMapFun)
    // 轉換完後就可以抽取時間戳和分配 watermark 了。通過extractTimestamp,每條數據就被flink內部打上時間戳了。
    // 下面是 flink 內置的類,也可以自己通過繼承AssignerWithPunctuatedWatermarks或AssignerWithPeriodicWatermarks來自定義,區別看後面總結。如果是離線環境,且涉及 ontime 操作,則建議繼承AssignerWithPunctuatedWatermarks來抽取時間戳和分配 watermark,因為其他實現可能不能及時推進watermark
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MetricRecord>(Time.seconds(10)) {
        @Override
        public long extractTimestamp(MetricRecord record) {
            return record.getTimestamp();
        }
    });

// 第一個功能:每隔10分鐘計算一次這10分鐘裏面每個房間的卡頓、丟幀、音視頻不同步等故障指標的和。
// 這裏采用aggregate + ProcessWindowFunction 來完成,這樣可以一有數據就進行聚合,避免窗口存儲完10分鐘的數據後才開始計算,但實現也比原來復雜。最後的ProcessWindowFunction僅僅只是補充時間戳作為輸出。當然,也可以在aggregate的計算中一直保持時間戳。RoomErrorAggFunc的實現比較簡單,可以參考第二個功能的RegionLatAggFunc實現。
SingleOutputStreamOperator<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long>> roomErrorStat = cleanStream
        .keyBy(MetricRecord::getRoomid)
        .window(TumblingEventTimeWindows.of(Time.minutes(10)))
        .aggregate(new RoomErrorAggFunc(),
                new ProcessWindowFunction<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long>,
                        Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long>, Integer, TimeWindow>
                        () {
                    // 補充 watermark
                    @Override
                    public void process(Integer key, Context context,
                                        Iterable<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long>> elements,
                                        Collector<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long>> out) {
                        Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Long> res = elements.iterator().next();
                        res.setField(context.window().getStart(), 7);
                        out.collect(res);
                    }
                });

// 第二個功能:每隔10分鐘計算一次這10分鐘各地域的平均延遲情況。
SingleOutputStreamOperator<Tuple4<String, Long, Long, Long>> regionLatStat = cleanStream
        .keyBy(MetricRecord::getRegion)
        .window(TumblingEventTimeWindows.of(Time.minutes(10)))
        .aggregate(new RegionLatAggFunc(), new ProcessWindowFunction
        // 補充watermark同上
            }
        });
// RegionLatAggFunc的實現如下
public class RegionLatAggFunc implements AggregateFunction<MetricRecord,
        Tuple4<String, Long, Long, Long>, Tuple4<String, Long, Long, Long>> {

    /**
     * 中間結果
     * 0 => region;
     * 1 => alat;
     * 2 => vlat;
     * 3 => count
     *
     * 最終結果
     * 0 => region
     * 1 => alat;
     * 2 => vlat;
     * 3 => location for timestamp
     */

    @Override
    public Tuple4<String, Long, Long, Long> createAccumulator() {
        return new Tuple4<>("", 0L, 0L, 0L);
    }

    @Override
    public Tuple4<String, Long, Long, Long> add(MetricRecord metricRecord, Tuple4<String, Long, Long, Long> accumulator) {
        accumulator.setField(metricRecord.getRegion(), 0);
        accumulator.setField(metricRecord.getAlat() + accumulator.f1, 1);
        accumulator.setField(metricRecord.getVlat() + accumulator.f2, 2);
        accumulator.setField(accumulator.f3 + 1, 3);
        return accumulator;
    }

    @Override
    public Tuple4<String, Long, Long, Long> merge(Tuple4<String, Long, Long, Long> acc1,
                                                  Tuple4<String, Long, Long, Long> acc2) {
        acc1.setField(acc1.f0, 0);
        acc1.setField(acc1.f1 + acc2.f1, 1);
        acc1.setField(acc1.f2 + acc2.f2, 2);
        acc1.setField(acc1.f3 + acc2.f3, 3);
        return acc1;
    }

    @Override
    public Tuple4<String, Long, Long, Long> getResult(Tuple4<String, Long, Long, Long> accumulator) {
        return new Tuple4<>(accumulator.f0, accumulator.f1 / accumulator.f3, accumulator.f2 / accumulator.f3, 0L);
    }
}

// 第三和第四個功能可以合在一起實現,每隔10分鐘計算一次這10分鐘裏網站總體的卡頓情況。具體看代碼。
SingleOutputStreamOperator<Tuple3<Long, Double, Double>> blockStat = cleanStream
        .map(elem -> {
            long[] res = new long[3];
            res[0] = elem.getUserid();
            res[1] = elem.getUblock();
            res[2] = elem.getDblock();
            return res;
        })
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(10)))
        .process(new ProcessAllWindowFunction<long[], Tuple3<Long, Double, Double>,
                TimeWindow>() {
            @Override
            public void process(Context context, Iterable<long[]> elements,
                                Collector<Tuple3<Long, Double, Double>> out) throws Exception {
                long blockTimes = 0L;
                long blockTotal = 0L;
                HashSet<Long> visitorSet = new HashSet<>();
                for (long[] elem : elements) {
                    visitorSet.add(elem[0]);
                    // 功能三
                    blockTimes += (elem[1] != 0 || elem[2] != 0 ? 1L : 0L);
                    // 功能四
                    blockTotal += elem[1] + elem[2];
                }
                long time = context.window().getStart();
                double totalBlockRate = (double) blockTimes / visitorSet.size();
                double blockPeruser = (double) blockTotal / visitorSet.size();
                out.collect(new Tuple3<>(time, totalBlockRate, blockPeruser));
            }
        });

目前的實現如下,sink後面再說。這裏可以看到第三和第四的實現采用ProcessAllWindowFunction後並行度變為了1,因為它是直接把所有的數據聚合到一起計算的。這樣分布式計算就沒什麽意義了。在第二個業務部分會介紹更可行的方法。

技術分享圖片

基於Flink的視頻直播案例(上)