1. 程式人生 > >Flink從kafka中消費資料--解析

Flink從kafka中消費資料--解析

一、情況介紹:

基於scala語言的Flink從kafka中消費資料,然後使用protobuf解析,當然預設是使用string解析的,此處需要指定接收的資料格式

package cetc.kakfa2flink

import java.io.IOException
import java.util.Properties

import com.hxy.protobuf.DSFusion
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.
api.scala._ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.util.serialization.{AbstractDeserializationSchema, SimpleStringSchema} /** * @description: ${description} * @author: fangchangtan * @create: 2018-11-14 15:22 **/
object Kafka2FlinkTest { private val ZOOKEEPER_HOST = "192.168.xx.xx1:2181,192.168.xx.xx2:2181,192.168.xx.xx3:2181" private val KAFKA_BROKER = "192.168.xx.xx1:9092,192.168..xx.xx2:9092,192.168..xx.xx3:9092" private val TRANSACTION_GROUP = "group_id_xx" //定義的消費組 private val TOPIC = "TOPIC_MQ2KAFKA_DyFusion2"
//定義的topic def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.enableCheckpointing(5000) // configure Kafka consumer val properties = new Properties() properties.setProperty("bootstrap.servers", KAFKA_BROKER) // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST) properties.setProperty("group.id", TRANSACTION_GROUP ) //topicd的名字是new,schema預設使用SimpleStringSchema()即可 val myConsumer: FlinkKafkaConsumer08[Array[Byte]] = new FlinkKafkaConsumer08[Array[Byte]](TOPIC,new ByteArrayDeserializationSchema[Array[Byte]](),properties) val transaction: DataStream[Array[Byte]] = env.addSource(myConsumer) transaction.map(arrbyte => { val dFusionMessage = DSFusion.DSFusionMessage.parseFrom(arrbyte) val baicBean: BasicBean = BasicBean(dFusionMessage.getFusionNum,dFusionMessage.getMmsi) println("*****[baicBean] fusionNum:"+baicBean.fusionNum+";mmsi"+baicBean.mmsi) }) // transaction .setParallelism(4).writeAsText("E:\\123\\kafka2flink_test\\aa.txt") env.execute() } class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{ @throws[IOException] override def deserialize(message: Array[Byte]): Array[Byte] = message } //抽象類 abstract class OriBean //子類1:case class basicBean case class BasicBean(fusionNum :Int,mmsi:Int) extends OriBean //子類2:case class DyStaticBean case class DyStaticBean(fusionNum :Int,mmsi:Int,utc:Int, lon :Double,lat:Double,cSpeed:Double,cCourse:Int) extends OriBean }

二、當kafka接收的不是string型別,而是protobuf型別時候

使用Flink時,如果從Kafka中讀取輸入流,預設提供的是String型別的Schema:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new SimpleStringSchema(), properties);

如果存入Kafka中的資料不是JSON,而是Protobuf型別的資料,需要用二進位制的Schema進行接收,可以自己實現一個類,很簡單,只有一行程式碼:

 class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
  @throws[IOException]
  override def deserialize(message: Array[Byte]): Array[Byte] = message
}

然後使用時,如下所示:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new ByteArrayDeserializationSchema[Array[Byte]](), properties);