Flink+kafka實現Wordcount實時計算
1. Flink
Flink介紹:
Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把所有任務當成流來處理,這也是其最大的特點。Flink 可以支持本地的快速叠代,以及一些環形的叠代任務。
Flink的特性:
Flink是個分布式流處理開源框架:
1>. 即使數據源是無序的或者晚到達的數據,也能保持結果準確性
2>. 有狀態並且容錯,可以無縫的從失敗中恢復,並可以保持exactly-once
3>. 大規模分布式
4>. 實時計算場景的廣泛應用(阿裏雙十一實時交易額使用的Blink就是根據Flink改造而來)
Flink可以確保僅一次語義狀態計算;Flink有狀態意味著,程序可以保持已經處理過的數據;
Flink支持流處理和窗口事件時間語義,Flink支持靈活的基於時間窗口,計數,或會話數據驅動的窗戶;
Flink容錯是輕量級和在同一時間允許系統維持高吞吐率和提供僅一次的一致性保證,Flink從失敗中恢復,零數據丟失;
Flink能夠高吞吐量和低延遲;
Flink保存點提供版本控制機制,從而能夠更新應用程序或再加工歷史數據沒有丟失並在最小的停機時間。
2. Kafka
Kafka介紹
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
Kafka特性
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性:
1>. 通過磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
2>. 高吞吐量即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
3>. 支持通過Kafka服務器和消費機集群來分區消息。
4>. 支持Hadoop並行數據加載。
Kafka的安裝配置及基礎使用
因為此篇博客是本地Flink消費Kafka的數據實現WordCount,所以Kafka不需要做過多配置,從Apache官網下載安裝包直接解壓即可使用
這裏我們創建一個名為test的topic
在producer輸入數據流:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在consumer監控從producer輸入的數據流:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
3. Flink Java API實現Flink消費Kafka的數據實現WordCount過程
1>. 創建maven project
2>. 配置flink和flink-kafka需要的依賴pom文件
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
3>. 引入Flink StreamExecutionEnvironment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4>. 設置監控數據流時間間隔(官方叫狀態與檢查點)
env.enableCheckpointing(1000);
5>. 配置kafka和zookeeper的ip和端口
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");
6>. 將kafka和zookeeper配置信息加載到Flink StreamExecutionEnvironment
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties);
7>. 將Kafka的數據轉成flink的DataStream類型
DataStream<String> stream = env.addSource(myConsumer);
8>. 實施計算模型並輸出結果
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print();
計算模型具體邏輯代碼
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
4. 驗證
1>. Kafka producer輸入
2>. Flink客戶端立刻得出結果
完整代碼
package com.scn;
import java.util.Properties;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print();
env.execute("WordCount from Kafka data");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
Flink+kafka實現Wordcount實時計算