1. 程式人生 > >Flink讀寫系列之-讀Kafka並寫入Kafka

Flink讀寫系列之-讀Kafka並寫入Kafka

讀寫Kafka比較簡單,官方提供了connector,也提供了例子可以參看,官網例子的GitHub地址:

下面只做簡單的說明:

    val myConsumer = new FlinkKafkaConsumer010[String](KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), KAFKA_PROP)//引數分別是:指定取資料的topic,序列化器,kafka引數
    env.addSource[String](myConsumer)//讀入kafka資料產生dataStream,可針對dataStream進行各種邏輯處理,比如map,filter等等。

//處理結果寫入kafka
 dataStream.addSink(new FlinkKafkaProducer010[String](KAFKA_PRODUCER_TOPIC,new SimpleStringSchema(),KAFKA_PROP))//引數分別是:寫入topic,序列化器,kafka配置慘

//kafka配置引數
  lazy val KAFKA_PROP: Properties = new Properties() {
    setProperty("bootstrap.servers", KAFKA_BROKER)//broker地址
    setProperty("zookeeper.connect", KAFKA_ZOOKEEPER_HOST)//zookeeper配置
    setProperty("group.id", KAFKA_GROUP_ID)//組id
  }