Flink讀寫系列之-讀Kafka並寫入Kafka
阿新 • • 發佈:2018-12-16
讀寫Kafka比較簡單,官方提供了connector,也提供了例子可以參看,官網例子的GitHub地址:
下面只做簡單的說明:
val myConsumer = new FlinkKafkaConsumer010[String](KAFKA_CONSUMER_TOPIC, new SimpleStringSchema(), KAFKA_PROP)//引數分別是:指定取資料的topic,序列化器,kafka引數 env.addSource[String](myConsumer)//讀入kafka資料產生dataStream,可針對dataStream進行各種邏輯處理,比如map,filter等等。 //處理結果寫入kafka dataStream.addSink(new FlinkKafkaProducer010[String](KAFKA_PRODUCER_TOPIC,new SimpleStringSchema(),KAFKA_PROP))//引數分別是:寫入topic,序列化器,kafka配置慘 //kafka配置引數 lazy val KAFKA_PROP: Properties = new Properties() { setProperty("bootstrap.servers", KAFKA_BROKER)//broker地址 setProperty("zookeeper.connect", KAFKA_ZOOKEEPER_HOST)//zookeeper配置 setProperty("group.id", KAFKA_GROUP_ID)//組id }