Flink從kafka中消費資料--解析
阿新 • • 發佈:2018-11-25
一、情況介紹:
基於scala語言的Flink從kafka中消費資料,然後使用protobuf解析,當然預設是使用string解析的,此處需要指定接收的資料格式
package cetc.kakfa2flink
import java.io.IOException
import java.util.Properties
import com.hxy.protobuf.DSFusion
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink. api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.util.serialization.{AbstractDeserializationSchema, SimpleStringSchema}
/**
* @description: ${description}
* @author: fangchangtan
* @create: 2018-11-14 15:22
**/
object Kafka2FlinkTest {
private val ZOOKEEPER_HOST = "192.168.xx.xx1:2181,192.168.xx.xx2:2181,192.168.xx.xx3:2181"
private val KAFKA_BROKER = "192.168.xx.xx1:9092,192.168..xx.xx2:9092,192.168..xx.xx3:9092"
private val TRANSACTION_GROUP = "group_id_xx" //定義的消費組
private val TOPIC = "TOPIC_MQ2KAFKA_DyFusion2" //定義的topic
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
// configure Kafka consumer
val properties = new Properties()
properties.setProperty("bootstrap.servers", KAFKA_BROKER)
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
properties.setProperty("group.id", TRANSACTION_GROUP )
//topicd的名字是new,schema預設使用SimpleStringSchema()即可
val myConsumer: FlinkKafkaConsumer08[Array[Byte]] = new FlinkKafkaConsumer08[Array[Byte]](TOPIC,new ByteArrayDeserializationSchema[Array[Byte]](),properties)
val transaction: DataStream[Array[Byte]] = env.addSource(myConsumer)
transaction.map(arrbyte => {
val dFusionMessage = DSFusion.DSFusionMessage.parseFrom(arrbyte)
val baicBean: BasicBean = BasicBean(dFusionMessage.getFusionNum,dFusionMessage.getMmsi)
println("*****[baicBean] fusionNum:"+baicBean.fusionNum+";mmsi"+baicBean.mmsi)
})
// transaction .setParallelism(4).writeAsText("E:\\123\\kafka2flink_test\\aa.txt")
env.execute()
}
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
@throws[IOException]
override def deserialize(message: Array[Byte]): Array[Byte] = message
}
//抽象類
abstract class OriBean
//子類1:case class basicBean
case class BasicBean(fusionNum :Int,mmsi:Int) extends OriBean
//子類2:case class DyStaticBean
case class DyStaticBean(fusionNum :Int,mmsi:Int,utc:Int,
lon :Double,lat:Double,cSpeed:Double,cCourse:Int) extends OriBean
}
二、當kafka接收的不是string型別,而是protobuf型別時候
使用Flink時,如果從Kafka中讀取輸入流,預設提供的是String型別的Schema:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new SimpleStringSchema(), properties);
如果存入Kafka中的資料不是JSON,而是Protobuf型別的資料,需要用二進位制的Schema進行接收,可以自己實現一個類,很簡單,只有一行程式碼:
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
@throws[IOException]
override def deserialize(message: Array[Byte]): Array[Byte] = message
}
然後使用時,如下所示:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new ByteArrayDeserializationSchema[Array[Byte]](), properties);