1. 程式人生 > 實用技巧 >Flink 讀取 Kafka資料

Flink 讀取 Kafka資料

POM

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala 
--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.1</version> </dependency>
</dependencies>

原始碼:

package com.kpwong.aiptest

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object KafkaTest {

  def main(args: Array[String]): Unit 
= { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //從kafka讀取資料 val prob: Properties = new Properties() prob.setProperty("bootstrap.servers","hadoop202:9092") //bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 傳送資料命令 val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("two",new SimpleStringSchema(),prob)) kafkaDS.print() env.execute() } }

Kafka傳送資料:

//bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 

執行結果: