1. 程式人生 > 程式設計 >Flink 系列(三)—— Flink Data Source

Flink 系列(三)—— Flink Data Source

一、內建 Data Source

Flink Data Source 用於定義 Flink 程式的資料來源,Flink 官方提供了多種資料獲取方法,用於幫助開發者簡單快速地構建輸入流,具體如下:

1.1 基於檔案構建

1. readTextFile(path):按照 TextInputFormat 格式讀取文字檔案,並將其內容以字串的形式返回。示例如下:

env.readTextFile(filePath).print();
複製程式碼

2. readFile(fileInputFormat,path) :按照指定格式讀取檔案。

3. readFile(inputFormat,filePath,watchType,interval,typeInformation)

:按照指定格式週期性的讀取檔案。其中各個引數的含義如下:

  • inputFormat:資料流的輸入格式。
  • filePath:檔案路徑,可以是本地檔案系統上的路徑,也可以是 HDFS 上的檔案路徑。
  • watchType:讀取方式,它有兩個可選值,分別是 FileProcessingMode.PROCESS_ONCEFileProcessingMode.PROCESS_CONTINUOUSLY:前者表示對指定路徑上的資料只讀取一次,然後退出;後者表示對路徑進行定期地掃描和讀取。需要注意的是如果 watchType 被設定為 PROCESS_CONTINUOUSLY,那麼當檔案被修改時,其所有的內容 (包含原有的內容和新增的內容) 都將被重新處理,因此這會打破 Flink 的 exactly-once
    語義。
  • interval:定期掃描的時間間隔。
  • typeInformation:輸入流中元素的型別。

使用示例如下:

final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),FileProcessingMode.PROCESS_ONCE,1,BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
複製程式碼

1.2 基於集合構建

1. fromCollection(Collection):基於集合構建,集合中的所有元素必須是同一型別。示例如下:

env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
複製程式碼

2. fromElements(T ...): 基於元素構建,所有元素必須是同一型別。示例如下:

env.fromElements(1,5).print();
複製程式碼

3. generateSequence(from,to):基於給定的序列區間進行構建。示例如下:

env.generateSequence(0,100);
複製程式碼

4. fromCollection(Iterator,Class):基於迭代器進行構建。第一個引數用於定義迭代器,第二個引數用於定義輸出元素的型別。使用示例如下:

env.fromCollection(new CustomIterator(),BasicTypeInfo.INT_TYPE_INFO).print();
複製程式碼

其中 CustomIterator 為自定義的迭代器,這裡以產生 1 到 100 區間內的資料為例,原始碼如下。需要注意的是自定義迭代器除了要實現 Iterator 介面外,還必須要實現序列化介面 Serializable ,否則會丟擲序列化失敗的異常:

import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator<Integer>,Serializable {
    private Integer i = 0;

    @Override
    public boolean hasNext() {
        return i < 100;
    }

    @Override
    public Integer next() {
        i++;
        return i;
    }
}
複製程式碼

5. fromParallelCollection(SplittableIterator,Class):方法接收兩個引數,第二個引數用於定義輸出元素的型別,第一個引數 SplittableIterator 是迭代器的抽象基類,它用於將原始迭代器的值拆分到多個不相交的迭代器中。

1.3 基於 Socket 構建

Flink 提供了 socketTextStream 方法用於構建基於 Socket 的資料流,socketTextStream 方法有以下四個主要引數:

  • hostname:主機名;
  • port:埠號,設定為 0 時,表示埠號自動分配;
  • delimiter:用於分隔每條記錄的分隔符;
  • maxRetry:當 Socket 臨時關閉時,程式的最大重試間隔,單位為秒。設定為 0 時表示不進行重試;設定為負值則表示一直重試。示例如下:
 env.socketTextStream("192.168.0.229",9999,"\n",3).print();
複製程式碼

二、自定義 Data Source

2.1 SourceFunction

除了內建的資料來源外,使用者還可以使用 addSource 方法來新增自定義的資料來源。自定義的資料來源必須要實現 SourceFunction 介面,這裡以產生 [0,1000) 區間內的資料為例,程式碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFunction<Long>() {
    
    private long count = 0L;
    private volatile boolean isRunning = true;

    public void run(SourceContext<Long> ctx) {
        while (isRunning && count < 1000) {
            // 通過collect將輸入傳送出去 
            ctx.collect(count);
            count++;
        }
    }

    public void cancel() {
        isRunning = false;
    }

}).print();
env.execute();
複製程式碼

2.2 ParallelSourceFunction 和 RichParallelSourceFunction

上面通過 SourceFunction 實現的資料來源是不具有並行度的,即不支援在得到的 DataStream 上呼叫 setParallelism(n) 方法,此時會丟擲如下的異常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
複製程式碼

如果你想要實現具有並行度的輸入流,則需要實現 ParallelSourceFunction 或 RichParallelSourceFunction 介面,其與 SourceFunction 的關係如下圖:

https://github.com/heibaiying
ParallelSourceFunction 直接繼承自 ParallelSourceFunction,具有並行度的功能。RichParallelSourceFunction 則繼承自 AbstractRichFunction,同時實現了 ParallelSourceFunction 介面,所以其除了具有並行度的功能外,還提供了額外的與生命週期相關的方法,如 open() ,closen() 。

三、Streaming Connectors

3.1 內建聯結器

除了自定義資料來源外, Flink 還內建了多種聯結器,用於滿足大多數的資料收集場景。當前內建聯結器的支援情況如下:

  • Apache Kafka (支援 source 和 sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

除了上述的聯結器外,你還可以通過 Apache Bahir 的聯結器擴充套件 Flink。Apache Bahir 旨在為分散式資料分析系統 (如 Spark,Flink) 等提供功能上的擴充套件,當前其支援的與 Flink 相關的聯結器如下:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

隨著 Flink 的不斷髮展,可以預見到其會支援越來越多型別的聯結器,關於聯結器的後續發展情況,可以檢視其官方檔案:Streaming Connectors 。在所有 DataSource 聯結器中,使用的廣泛的就是 Kafka,所以這裡我們以其為例,來介紹 Connectors 的整合步驟。

3.2 整合 Kakfa

1. 匯入依賴

整合 Kafka 時,一定要注意所使用的 Kafka 的版本,不同版本間所需的 Maven 依賴和開發時所呼叫的類均不相同,具體如下:

Maven 依賴 Flink 版本 Consumer and Producer 類的名稱 Kafka 版本
flink-connector-kafka-0.8_2.11 1.0.0 + FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 + FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 + FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 + FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x
flink-connector-kafka_2.11 1.7.0 + FlinkKafkaConsumer
FlinkKafkaProducer
>= 1.0.0

這裡我使用的 Kafka 版本為 kafka_2.12-2.2.0,新增的依賴如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.0</version>
</dependency>
複製程式碼

2. 程式碼開發

這裡以最簡單的場景為例,接收 Kafka 上的資料並列印,程式碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的連線位置
properties.setProperty("bootstrap.servers","hadoop001:9092");
// 指定監聽的主題,並定義Kafka位元組訊息到Flink物件之間的轉換規則
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic",new SimpleStringSchema(),properties));
stream.print();
env.execute("Flink Streaming");
複製程式碼

3.3 整合測試

1. 啟動 Kakfa

Kafka 的執行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內建的 zookeeper,也可以啟動自己安裝的:

# zookeeper啟動命令
bin/zkServer.sh start

# 內建zookeeper啟動命令
bin/zookeeper-server-start.sh config/zookeeper.properties
複製程式碼

啟動單節點 kafka 用於測試:

# bin/kafka-server-start.sh config/server.properties
複製程式碼

2. 建立 Topic

# 建立用於測試主題
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-in-topic

# 檢視所有主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
複製程式碼

3. 啟動 Producer

這裡 啟動一個 Kafka 生產者,用於傳送測試資料:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic
複製程式碼

4. 測試結果

在 Producer 上輸入任意測試資料,之後觀察程式控制臺的輸出:

https://github.com/heibaiying
程式控制臺的輸出如下:

https://github.com/heibaiying
可以看到已經成功接收並打印出相關的資料。

參考資料

  1. data-sources:ci.apache.org/projects/fl…
  2. Streaming Connectors:ci.apache.org/projects/fl…
  3. Apache Kafka Connector: ci.apache.org/projects/fl…

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南