1. 程式人生 > >Flink消費Kafka到HDFS實現及詳解

Flink消費Kafka到HDFS實現及詳解

1.概述

最近有同學留言諮詢,Flink消費Kafka的一些問題,今天筆者將用一個小案例來為大家介紹如何將Kafka中的資料,通過Flink任務來消費並存儲到HDFS上。

2.內容

這裡舉個消費Kafka的資料的場景。比如,電商平臺、遊戲平臺產生的使用者資料,入庫到Kafka中的Topic進行儲存,然後採用Flink去實時消費積累到HDFS上,積累後的資料可以構建資料倉庫(如Hive)做資料分析,或是用於資料訓練(演算法模型)。如下圖所示:

 

2.1 環境依賴

整個流程,需要依賴的元件有Kafka、Flink、Hadoop。由於Flink提交需要依賴Hadoop的計算資源和儲存資源,所以Hadoop的YARN和HDFS均需要啟動。各個元件版本如下:

元件 版本
Kafka 2.4.0
Flink 1.10.0
Hadoop 2.10.0

 

2.2 程式碼實現

Flink消費Kafka叢集中的資料,需要依賴Flink包,依賴如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>${flink.connector.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>${flink.kafka.version}</version>
 </dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>${flink.streaming.version}</version>
 </dependency>

編寫消費Topic的Flink程式碼,這裡不對Topic中的資料做邏輯處理,直接消費並存儲到HDFS上。程式碼如下:

/**
 * Flink consumer topic data and store into hdfs.
 * 
 * @author smartloli.
 *
 *         Created by Mar 15, 2020
 */
public class Kafka2Hdfs {

    private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class);

    public static void main(String[] args) {
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));

        // Storage into hdfs
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);

        sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd"));

        sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB
        sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs
        transction.addSink(sink);

        env.execute("Kafka2Hdfs");
    }

    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

}

2.3 注意事項

  • 儲存到HDFS時,不用新增其他HDFS依賴,只需要Flink採用yarn-cluster模式提交即可;
  • 採用FSDataOutputStream寫入時,會先寫入緩衝區,放在記憶體中;
  • Flink每次做Checkpoint的時候,會Flush緩衝區的資料,以及將Pending(已經完成的檔案,但為被Checkpoint記錄,可以通過sink.setPendingSuffix("xxx")來設定)結尾的檔案記錄下來
  • Flink每60秒(可以通過sink.setInactiveBucketCheckInterval(60 * 1000)來進行設定)檢測,如果一個檔案的FSDataOutputStream在60秒內(可以通過sink.setInactiveBucketThreshold(60 * 1000)來設定),都還沒有接收到資料,Flink就會認為該檔案是不活躍的Bucket,那麼就會被Flush後關閉該檔案;
  • 我們再深入一點檢視程式碼,實際上只是在processingTimeService中註冊了當前的時間(currentProcessingTime)+ 60秒不寫入的時間(inactiveBucketCheckInterval)。接著通過onProcessIngTime方法去不停的判斷是否滿足60秒不寫入,同時也會判斷是否到了滾動時間。程式碼如下:
public void onProcessingTime(long timestamp) throws Exception {
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); 
        closePartFilesByTime(currentProcessingTime);
        processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}        
  • 在Flink內部封裝了一個集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用來記錄當前正在使用的檔案,key是檔案的路徑,BucketState內部封裝了該檔案的所有資訊,包括建立時間,最後一次寫入時間(這裡的寫入指的是寫入快取區的時間,不是Flush的時間)。當前檔案是開啟還是關閉,寫緩衝區的方法。都在這裡。每次Flink要對檔案進行操作的時候,都會從這裡拿到檔案的封裝物件;
  • 當程式被取消的時候,當前正在操作的檔案,會被Flush,然後關閉。然後將檔案的字尾名從in-progress改為pending。這個前後綴都是可以設定,但如果沒有什麼特殊需求,預設即可。這裡拿檔案,用的就是上面說的bucketStates這個map。它在close方法中,會去遍歷這個map,去做上述的操作;程式碼如下:
public void close() throws Exception {
        if (state != null) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                closeCurrentPartFile(entry.getValue());
            }
        }
}
  • 每次寫入的時候,都是會bucketStates這個map中獲取對應的物件,如果沒有,就會new一個該物件。然後先判斷是否需要滾動(通過當前檔案大小和滾動時間去判斷),然後才將資料寫入緩衝區,更新最後寫入時間,程式碼如下:
public void invoke(T value) throws Exception {
        Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
 
        long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
        BucketState<T> bucketState = state.getBucketState(bucketPath);
        if (bucketState == null) {
            bucketState = new BucketState<>(currentProcessingTime);
            state.addBucketState(bucketPath, bucketState);
        }
 
        if (shouldRoll(bucketState, currentProcessingTime)) {
            openNewPartFile(bucketPath, bucketState);
        }
 
        bucketState.writer.write(value);
        bucketState.lastWrittenToTime = currentProcessingTime;
}
  • 寫入和關閉HDFS是通過非同步的方式的,非同步的超時時間預設是60秒,可以通過 sink.setAsyncTimeout(60 * 1000)去設定

3.總結

Flink消費Kafka資料並寫到HDFS的程式碼實現是比較簡短了,沒有太多複雜的邏輯。實現的時候,注意Kafka的地址、反序列化需要在屬性中配置、以及Flink任務提交的時候,設定yarn-cluster模式、設定好記憶體和CPU、HDFS儲存路徑等資訊。

4.結束語

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視