Flink 讀取 Kafka資料
阿新 • • 發佈:2020-12-05
POM
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
原始碼:
package com.kpwong.aiptest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
object KafkaTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//從kafka讀取資料
val prob: Properties = new Properties()
prob.setProperty("bootstrap.servers","hadoop202:9092")
//bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 傳送資料命令
val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("two",new SimpleStringSchema(),prob))
kafkaDS.print()
env.execute()
}
}
Kafka傳送資料:
//bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two
執行結果: