Fink 寫資料到kafka (kafka sink)
阿新 • • 發佈:2020-12-06
POM 檔案依賴:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</ groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
Kafka sink程式碼(從 netcat中讀取資料)
package com.kpwong.sink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
socketDS.addSink(new FlinkKafkaProducer011[String]("hadoop202:9092","two",new SimpleStringSchema()))
//消費者 讀取kafka資料命令
//bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --from-beginning --topic two
env.execute("Kafka Sink Test")
}
}
執行結果:
輸入資料:
kafka 接受到的資料命令:
bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --topic two