1. 程式人生 > 實用技巧 >Fink 寫資料到kafka (kafka sink)

Fink 寫資料到kafka (kafka sink)

POM 檔案依賴:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala 
--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.2</version> </dependency> <dependency> <groupId>org.apache.flink</
groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.1</version> </dependency> </dependencies>

Kafka sink程式碼(從 netcat中讀取資料)

package com.kpwong.sink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 object KafkaSinkTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999) socketDS.addSink(new FlinkKafkaProducer011[String]("hadoop202:9092","two",new SimpleStringSchema())) //消費者 讀取kafka資料命令 //bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --from-beginning --topic two env.execute("Kafka Sink Test") } }

執行結果:

輸入資料:

kafka 接受到的資料命令:

bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --topic two