1. 程式人生 > 實用技巧 >Flink(五) 【消費kafka】

Flink(五) 【消費kafka】

目錄

0.目的

測試flink消費kafka的幾種消費策略

kafkaSource.setStartFromEarliest() //從起始位置
kafkaSource.setStartFromLatest() //從最新位置
kafkaSource.setStartFromTimestamp("起始時間") //從指定時間開始消費
kafkaSource.setStartFromGroupOffsets() //預設
kafkaSource.setStartFromSpecificOffsets() //指定offset

1.本地測試

package flink_01_connector.source

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.collection.JavaConverters._

/**
 * @description: kafka connector
 * @author: HaoWu
 * @create: 2020年12月16日
 */
object KafkaConnectorTest {
  def main(args: Array[String]): Unit = {
    // 0 初始化環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1 構建 Kafka Source
    val topics = List("xes_test_anwser_detail").asJava
    val props = new Properties()
    props.put("bootstrap.servers", "kafka地址")
    props.put("group.id", "test5")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //key 反序列化
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //value 反序列化

    /*  props.put("enable.auto.commit", "true") //自動提交
        props.put("auto.commit.interval.ms", "1500") //提交
        props.put("auto.offset.reset", "lastest") //offset從最新的位置開始讀取*/
    val kafkaSource = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema(), props)

    val lag = System.currentTimeMillis() - 24 * 3600 * 1000
    kafkaSource.setStartFromTimestamp(lag) // 從前幾小時開始消費

    // 2 獲取流
    val kafkaStream: DataStream[String] = env.addSource(kafkaSource)

    // 3 列印
    kafkaStream.print()

    // 4 執行
    env.execute()
  }
}

2.線上測試

package flink_01_connector.source

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.collection.JavaConverters._

/**
 * @description: 讀取kafka流
 * @author: HaoWu
 * @create: 2020年12月16日
 */
object KafkaConnectorOnlineTest {
  def main(args: Array[String]): Unit = {
    // 0 初始化環境
    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val topic: String = parameterTool.get("topic") // topic:可設定多個,逗號分隔
    val bootStrapServer: String = parameterTool.get("bootstrap_server") // kafka叢集url
    val groupId: String = parameterTool.get("group_id") // 消費者組
    val hours: Int = parameterTool.get("hours").toInt // 從幾小時前開始消費

    //    val keyTabPath = parameterTool.get("keytab_path") // 安全驗證
    //    env.registerCachedFile(keyTabPath, "keytab")
    // 設定全域性引數
    env.getConfig.setGlobalJobParameters(parameterTool)
    // 1 構建 Kafka Source
    val topics = topic.split(",").toList.asJava
    val props = new Properties()
    props.put("bootstrap.servers", bootStrapServer)
    props.put("group.id", groupId)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //key 反序列化
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //value 反序列化
    val kafkaSource = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema(), props)


    val lag = System.currentTimeMillis() - hours * 3600 * 1000
    kafkaSource.setStartFromTimestamp(lag) // 從前幾小時開始消費
    //    kafkaSource.setStartFromEarliest() // 從最開始消費
    // 2 獲取流
    val kafkaStream: DataStream[String] = env.addSource(kafkaSource)

    // 3 列印
    kafkaStream.print("| log |")

    // 4 執行
    env.execute()
  }
}

提交作業

#!/bin/bash

source  ~/.bashrc

cd $(dirname $0)
day=$(date +%Y%m%d%H%M)

#flink 
jobName=KafkaConnectorOnlineTest_wuhao
clazz=flink_01_connector.source.KafkaConnectorOnlineTest
jar_path=/home//wuhao/flink-learning/jar/02_flink_learning-1.0-SNAPSHOT-jar-with-dependencies.jar
parallelism=2
sourceParallelism=4

#kafka  bootstrap_server
bootstrap_server=kafka url
topic=xes_test_anwser_detail
group_id=KafkaConnectorOnlineTest_wuhao
hours=24

#kudu
kudu_instance=1v6_common_edc_online_answer
kudu_host=****:7051
kudu_flush_num=5

#-----------------------run----------------------------------------------
/software/servers/flink1.9.1_wx_dp_hive/bin/flink run -m yarn-cluster \
-ynm ${jobName} \
-yqu root.wangxiao.dp \
-c ${clazz} ${jar_path} \
--jobName ${jobName} \
--keytab_path /home/wx_dp_hive/wx_dp_hive.keytab \
--bootstrap_server ${bootstrap_server} \
--topic ${topic} \
--group_id ${group_id} \
--isSecurity ${isSecurity} \
--consumerStrategy ${consumerStrategy} \
--hours ${hours} \
--parallelism ${parallelism} \
--sourceParallelism ${sourceParallelism} \
--kudu_instance ${kudu_instance} \
--kudu_host ${kudu_host} \
--kudu_flush_num ${kudu_flush_num} >../logs/${jobName}_${day}.log 2>&1 &