Flink消費Kafka到HDFS實現及詳解
阿新 • • 發佈:2020-03-15
1.概述
最近有同學留言諮詢,Flink消費Kafka的一些問題,今天筆者將用一個小案例來為大家介紹如何將Kafka中的資料,通過Flink任務來消費並存儲到HDFS上。
2.內容
這裡舉個消費Kafka的資料的場景。比如,電商平臺、遊戲平臺產生的使用者資料,入庫到Kafka中的Topic進行儲存,然後採用Flink去實時消費積累到HDFS上,積累後的資料可以構建資料倉庫(如Hive)做資料分析,或是用於資料訓練(演算法模型)。如下圖所示:
2.1 環境依賴
整個流程,需要依賴的元件有Kafka、Flink、Hadoop。由於Flink提交需要依賴Hadoop的計算資源和儲存資源,所以Hadoop的YARN和HDFS均需要啟動。各個元件版本如下:
元件 | 版本 |
Kafka | 2.4.0 |
Flink | 1.10.0 |
Hadoop | 2.10.0 |
2.2 程式碼實現
Flink消費Kafka叢集中的資料,需要依賴Flink包,依賴如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.connector.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>${flink.kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.streaming.version}</version> </dependency>
編寫消費Topic的Flink程式碼,這裡不對Topic中的資料做邏輯處理,直接消費並存儲到HDFS上。程式碼如下:
/** * Flink consumer topic data and store into hdfs. * * @author smartloli. * * Created by Mar 15, 2020 */ public class Kafka2Hdfs { private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class); public static void main(String[] args) { if (args.length != 3) { LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist."); return; } String bootStrapServer = args[0]; String hdfsPath = args[1]; int parallelism = Integer.parseInt(args[2]); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.setParallelism(parallelism); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer))); // Storage into hdfs BucketingSink<String> sink = new BucketingSink<>(hdfsPath); sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd")); sink.setBatchSize(1024 * 1024 * 1024); // this is 1GB sink.setBatchRolloverInterval(1000 * 60 * 60); // one hour producer a file into hdfs transction.addSink(sink); env.execute("Kafka2Hdfs"); } private static Object configByKafkaServer(String bootStrapServer) { Properties props = new Properties(); props.setProperty("bootstrap.servers", bootStrapServer); props.setProperty("group.id", "test_bll_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
2.3 注意事項
- 儲存到HDFS時,不用新增其他HDFS依賴,只需要Flink採用yarn-cluster模式提交即可;
- 採用FSDataOutputStream寫入時,會先寫入緩衝區,放在記憶體中;
- Flink每次做Checkpoint的時候,會Flush緩衝區的資料,以及將Pending(已經完成的檔案,但為被Checkpoint記錄,可以通過sink.setPendingSuffix("xxx")來設定)結尾的檔案記錄下來
- Flink每60秒(可以通過sink.setInactiveBucketCheckInterval(60 * 1000)來進行設定)檢測,如果一個檔案的FSDataOutputStream在60秒內(可以通過sink.setInactiveBucketThreshold(60 * 1000)來設定),都還沒有接收到資料,Flink就會認為該檔案是不活躍的Bucket,那麼就會被Flush後關閉該檔案;
- 我們再深入一點檢視程式碼,實際上只是在processingTimeService中註冊了當前的時間(currentProcessingTime)+ 60秒不寫入的時間(inactiveBucketCheckInterval)。接著通過onProcessIngTime方法去不停的判斷是否滿足60秒不寫入,同時也會判斷是否到了滾動時間。程式碼如下:
public void onProcessingTime(long timestamp) throws Exception { long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); closePartFilesByTime(currentProcessingTime); processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); }
- 在Flink內部封裝了一個集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用來記錄當前正在使用的檔案,key是檔案的路徑,BucketState內部封裝了該檔案的所有資訊,包括建立時間,最後一次寫入時間(這裡的寫入指的是寫入快取區的時間,不是Flush的時間)。當前檔案是開啟還是關閉,寫緩衝區的方法。都在這裡。每次Flink要對檔案進行操作的時候,都會從這裡拿到檔案的封裝物件;
- 當程式被取消的時候,當前正在操作的檔案,會被Flush,然後關閉。然後將檔案的字尾名從in-progress改為pending。這個前後綴都是可以設定,但如果沒有什麼特殊需求,預設即可。這裡拿檔案,用的就是上面說的bucketStates這個map。它在close方法中,會去遍歷這個map,去做上述的操作;程式碼如下:
public void close() throws Exception { if (state != null) { for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { closeCurrentPartFile(entry.getValue()); } } }
- 每次寫入的時候,都是會bucketStates這個map中獲取對應的物件,如果沒有,就會new一個該物件。然後先判斷是否需要滾動(通過當前檔案大小和滾動時間去判斷),然後才將資料寫入緩衝區,更新最後寫入時間,程式碼如下:
public void invoke(T value) throws Exception { Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value); long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); BucketState<T> bucketState = state.getBucketState(bucketPath); if (bucketState == null) { bucketState = new BucketState<>(currentProcessingTime); state.addBucketState(bucketPath, bucketState); } if (shouldRoll(bucketState, currentProcessingTime)) { openNewPartFile(bucketPath, bucketState); } bucketState.writer.write(value); bucketState.lastWrittenToTime = currentProcessingTime; }
- 寫入和關閉HDFS是通過非同步的方式的,非同步的超時時間預設是60秒,可以通過 sink.setAsyncTimeout(60 * 1000)去設定
3.總結
Flink消費Kafka資料並寫到HDFS的程式碼實現是比較簡短了,沒有太多複雜的邏輯。實現的時候,注意Kafka的地址、反序列化需要在屬性中配置、以及Flink任務提交的時候,設定yarn-cluster模式、設定好記憶體和CPU、HDFS儲存路徑等資訊。
4.結束語
這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大資料探勘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學視