Flink(五) 【消費kafka】
阿新 • • 發佈:2020-12-17
目錄
0.目的
測試flink消費kafka的幾種消費策略
kafkaSource.setStartFromEarliest() //從起始位置
kafkaSource.setStartFromLatest() //從最新位置
kafkaSource.setStartFromTimestamp("起始時間") //從指定時間開始消費
kafkaSource.setStartFromGroupOffsets() //預設
kafkaSource.setStartFromSpecificOffsets() //指定offset
1.本地測試
package flink_01_connector.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import scala.collection.JavaConverters._ /** * @description: kafka connector * @author: HaoWu * @create: 2020年12月16日 */ object KafkaConnectorTest { def main(args: Array[String]): Unit = { // 0 初始化環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 1 構建 Kafka Source val topics = List("xes_test_anwser_detail").asJava val props = new Properties() props.put("bootstrap.servers", "kafka地址") props.put("group.id", "test5") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //value 反序列化 /* props.put("enable.auto.commit", "true") //自動提交 props.put("auto.commit.interval.ms", "1500") //提交 props.put("auto.offset.reset", "lastest") //offset從最新的位置開始讀取*/ val kafkaSource = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema(), props) val lag = System.currentTimeMillis() - 24 * 3600 * 1000 kafkaSource.setStartFromTimestamp(lag) // 從前幾小時開始消費 // 2 獲取流 val kafkaStream: DataStream[String] = env.addSource(kafkaSource) // 3 列印 kafkaStream.print() // 4 執行 env.execute() } }
2.線上測試
package flink_01_connector.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import scala.collection.JavaConverters._ /** * @description: 讀取kafka流 * @author: HaoWu * @create: 2020年12月16日 */ object KafkaConnectorOnlineTest { def main(args: Array[String]): Unit = { // 0 初始化環境 val parameterTool: ParameterTool = ParameterTool.fromArgs(args) val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val topic: String = parameterTool.get("topic") // topic:可設定多個,逗號分隔 val bootStrapServer: String = parameterTool.get("bootstrap_server") // kafka叢集url val groupId: String = parameterTool.get("group_id") // 消費者組 val hours: Int = parameterTool.get("hours").toInt // 從幾小時前開始消費 // val keyTabPath = parameterTool.get("keytab_path") // 安全驗證 // env.registerCachedFile(keyTabPath, "keytab") // 設定全域性引數 env.getConfig.setGlobalJobParameters(parameterTool) // 1 構建 Kafka Source val topics = topic.split(",").toList.asJava val props = new Properties() props.put("bootstrap.servers", bootStrapServer) props.put("group.id", groupId) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //value 反序列化 val kafkaSource = new FlinkKafkaConsumer011[String](topics, new SimpleStringSchema(), props) val lag = System.currentTimeMillis() - hours * 3600 * 1000 kafkaSource.setStartFromTimestamp(lag) // 從前幾小時開始消費 // kafkaSource.setStartFromEarliest() // 從最開始消費 // 2 獲取流 val kafkaStream: DataStream[String] = env.addSource(kafkaSource) // 3 列印 kafkaStream.print("| log |") // 4 執行 env.execute() } }
提交作業
#!/bin/bash source ~/.bashrc cd $(dirname $0) day=$(date +%Y%m%d%H%M) #flink jobName=KafkaConnectorOnlineTest_wuhao clazz=flink_01_connector.source.KafkaConnectorOnlineTest jar_path=/home//wuhao/flink-learning/jar/02_flink_learning-1.0-SNAPSHOT-jar-with-dependencies.jar parallelism=2 sourceParallelism=4 #kafka bootstrap_server bootstrap_server=kafka url topic=xes_test_anwser_detail group_id=KafkaConnectorOnlineTest_wuhao hours=24 #kudu kudu_instance=1v6_common_edc_online_answer kudu_host=****:7051 kudu_flush_num=5 #-----------------------run---------------------------------------------- /software/servers/flink1.9.1_wx_dp_hive/bin/flink run -m yarn-cluster \ -ynm ${jobName} \ -yqu root.wangxiao.dp \ -c ${clazz} ${jar_path} \ --jobName ${jobName} \ --keytab_path /home/wx_dp_hive/wx_dp_hive.keytab \ --bootstrap_server ${bootstrap_server} \ --topic ${topic} \ --group_id ${group_id} \ --isSecurity ${isSecurity} \ --consumerStrategy ${consumerStrategy} \ --hours ${hours} \ --parallelism ${parallelism} \ --sourceParallelism ${sourceParallelism} \ --kudu_instance ${kudu_instance} \ --kudu_host ${kudu_host} \ --kudu_flush_num ${kudu_flush_num} >../logs/${jobName}_${day}.log 2>&1 &