【FLINK】flink 讀取Kafka資料寫入Hive表中
阿新 • • 發佈:2021-05-11
背景:對實時的歷史資料持久化儲存到Hdfs中,可以使用Hive Sql 追溯歷史資料給業務參考多維度指標
版本資訊:Flink 1.11.2,Scala 2.11,Kafka 2.12,Hive 2.3.7
1、引入pom.xml內容
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zen.test</groupId> <artifactId>flink_test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <flink.version>1.11.2</flink.version> <hadoop.version>2.6.0</hadoop.version> <hbase.version>1.4.9</hbase.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>commons-compress</artifactId> <groupId>org.apache.commons</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.6.5-10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>scala-reflect</artifactId> <groupId>org.scala-lang</groupId> </exclusion> <exclusion> <artifactId>scala-xml_2.11</artifactId> <groupId>org.scala-lang.modules</groupId> </exclusion> <exclusion> <artifactId>scala-parser-combinators_2.11</artifactId> <groupId>org.scala-lang.modules</groupId> </exclusion> <exclusion> <artifactId>flink-table-api-scala_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-scala_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-streaming-scala_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>avatica-core</artifactId> <groupId>org.apache.calcite.avatica</groupId> </exclusion> <exclusion> <artifactId>javassist</artifactId> <groupId>org.javassist</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-2.3.6 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hive-2.3.6_2.11</artifactId> <version>1.11.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.4</version> <exclusions> <exclusion> <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId> <groupId>com.github.joshelser</groupId> </exclusion> <exclusion> <artifactId>calcite-core</artifactId> <groupId>org.apache.calcite</groupId> </exclusion> <exclusion> <artifactId>libthrift</artifactId> <groupId>org.apache.thrift</groupId> </exclusion> <exclusion> <artifactId>hive-vector-code-gen</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>metrics-jvm</artifactId> <groupId>io.dropwizard.metrics</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>orc-core</artifactId> <groupId>org.apache.orc</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>metrics-json</artifactId> <groupId>io.dropwizard.metrics</groupId> </exclusion> <exclusion> <artifactId>hive-storage-api</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>calcite-druid</artifactId> <groupId>org.apache.calcite</groupId> </exclusion> <exclusion> <artifactId>hive-common</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>avatica-metrics</artifactId> <groupId>org.apache.calcite.avatica</groupId> </exclusion> <exclusion> <artifactId>avatica</artifactId> <groupId>org.apache.calcite.avatica</groupId> </exclusion> <exclusion> <artifactId>hive-service-rpc</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>hive-serde</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>hive-llap-common</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>hive-llap-client</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>hive-llap-tez</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>hive-shims-common</artifactId> <groupId>org.apache.hive.shims</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hive-shims-0.23</artifactId> <groupId>org.apache.hive.shims</groupId> </exclusion> <exclusion> <artifactId>hive-shims-scheduler</artifactId> <groupId>org.apache.hive.shims</groupId> </exclusion> <exclusion> <artifactId>hive-shims</artifactId> <groupId>org.apache.hive</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>junit</artifactId> <groupId>junit</groupId> </exclusion> <exclusion> <artifactId>antlr-runtime</artifactId> <groupId>org.antlr</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>flink-runtime_2.11</artifactId> <groupId>org.apache.flink</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <artifactId>metrics-core</artifactId> <groupId>com.yammer.metrics</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>apacheds-i18n</artifactId> <groupId>org.apache.directory.server</groupId> </exclusion> <exclusion> <artifactId>api-asn1-api</artifactId> <groupId>org.apache.directory.api</groupId> </exclusion> <exclusion> <artifactId>api-util</artifactId> <groupId>org.apache.directory.api</groupId> </exclusion> <exclusion> <artifactId>apacheds-kerberos-codec</artifactId> <groupId>org.apache.directory.server</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <artifactId>avro</artifactId> <groupId>org.apache.avro</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <artifactId>metrics-core</artifactId> <groupId>com.yammer.metrics</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>metrics-core</artifactId> <groupId>io.dropwizard.metrics</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>avro</artifactId> <groupId>org.apache.avro</groupId> </exclusion> <exclusion> <artifactId>hbase-common</artifactId> <groupId>org.apache.hbase</groupId> </exclusion> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>commons-httpclient</artifactId> <groupId>commons-httpclient</groupId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>hadoop-client</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-hdfs</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>httpclient</artifactId> <groupId>org.apache.httpcomponents</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <artifactId>hadoop-yarn-client</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-server-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-api</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-app</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.1</version> <exclusions> <exclusion> <artifactId>commons-codec</artifactId> <groupId>commons-codec</groupId> </exclusion> <exclusion> <artifactId>httpcore</artifactId> <groupId>org.apache.httpcomponents</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.74</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>commons-cli</artifactId> <groupId>commons-cli</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.12</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>grizzled-slf4j_2.12</artifactId> <groupId>org.clapper</groupId> </exclusion> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>commons-cli</artifactId> <groupId>commons-cli</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-filesystem --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.11.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <artifactId>avro</artifactId> <groupId>org.apache.avro</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro-confluent-registry</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.7.0</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <artifactId>curator-framework</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>curator-recipes</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>htrace-core</artifactId> <groupId>org.htrace</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>commons-httpclient</artifactId> <groupId>commons-httpclient</groupId> </exclusion> <exclusion> <artifactId>jets3t</artifactId> <groupId>net.java.dev.jets3t</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-configuration</artifactId> <groupId>commons-configuration</groupId> </exclusion> <exclusion> <artifactId>commons-el</artifactId> <groupId>commons-el</groupId> </exclusion> <exclusion> <artifactId>commons-beanutils</artifactId> <groupId>commons-beanutils</groupId> </exclusion> <exclusion> <artifactId>commons-digester</artifactId> <groupId>commons-digester</groupId> </exclusion> <exclusion> <artifactId>commons-beanutils-core</artifactId> <groupId>commons-beanutils</groupId> </exclusion> <exclusion> <artifactId>jasper-runtime</artifactId> <groupId>tomcat</groupId> </exclusion> <exclusion> <artifactId>jersey-json</artifactId> <groupId>com.sun.jersey</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-xc</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>avro</artifactId> <groupId>org.apache.avro</groupId> </exclusion> <exclusion> <artifactId>apacheds-i18n</artifactId> <groupId>org.apache.directory.server</groupId> </exclusion> <exclusion> <artifactId>apacheds-kerberos-codec</artifactId> <groupId>org.apache.directory.server</groupId> </exclusion> <exclusion> <artifactId>api-util</artifactId> <groupId>org.apache.directory.api</groupId> </exclusion> <exclusion> <artifactId>api-asn1-api</artifactId> <groupId>org.apache.directory.api</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>commons-collections</artifactId> <groupId>commons-collections</groupId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>commons-compress</artifactId> <groupId>org.apache.commons</groupId> </exclusion> <exclusion> <artifactId>commons-math3</artifactId> <groupId>org.apache.commons</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <artifactId>htrace-core</artifactId> <groupId>org.htrace</groupId> </exclusion> <exclusion> <artifactId>jasper-runtime</artifactId> <groupId>tomcat</groupId> </exclusion> <exclusion> <artifactId>commons-el</artifactId> <groupId>commons-el</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/com.stumbleupon/async --> <dependency> <groupId>com.stumbleupon</groupId> <artifactId>async</artifactId> <version>1.4.1</version> <exclusions> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/resources</directory> <includes> <!--包含資料夾以及子資料夾下所有資源--> <include>**/*.*</include> </includes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.5.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <encoding>UTF-8</encoding> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!--<arg>-make:transitive</arg>--> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*com.xiaoe.Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
2、使用Table環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
3、使用HiveCatalog
HiveCatalog catalog = new HiveCatalog( "myhive", "db_test_tmp", "/usr/local/service/hive/conf" ); tableEnv.registerCatalog("myhive", catalog); tableEnv.useCatalog("myhive");
4、完整編碼
package com.source.kafka;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class HiveTest {
private final static Logger logger = LoggerFactory.getLogger(hiveTest.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
HiveCatalog catalog = new HiveCatalog(
"myhive",
"db_test_tmp",
"/usr/local/service/hive/conf"
);
tableEnv.registerCatalog("myhive", catalog);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("DROP TABLE IF EXISTS db_test_tmp.test_kafka_source");
tableEnv.executeSql("CREATE TABLE db_test_tmp.test_kafka_source (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") WITH (\n" +
" 'connector' = 'kafka'," +
" 'topic' = 'hive_test',\n" +
" 'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
" 'properties.group.id' = 'group_test_01',\n" +
" 'format' = 'json',\n"+
" 'is_generic' = 'false',\n"+
" 'scan.startup.mode' = 'latest-offset')");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS db_test_tmp.test_hive_sink");
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test_tmp.test_hive_sink (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO db_test_tmp.test_hive_sink SELECT id,name,age FROM test_kafka_source");
}
}
5、打包、執行、驗證
5.1 打包
mvn clean install
5.2 執行
flink run -m yarn-cluster -ynm test -ys 1 -ytm 1024m -yD state.checkpoints.dir=hdfs://127.0.0.1:4007/flink/checkpoints -c com.source.kafka.HiveTest /tmp/original-flink_test-1.0-SNAPSHOT.jar
5.3 驗證
5.3.1 kafka 驗證
kafka 新建topic,寫入資料
#新建topic
./kafka-topics.sh --zookeeper localhost:2181 --topic hive_test --partitions 1 --replication-factor 1 --create
#寫入資料
./kafka-console-producer.sh --broker-list localhost:9092 --topic hive_test
> {"id":1,"name":"zhangsan","age":100}
> {"id":10,"name":"lisi","age":10}
> {"id":9,"name":"wangwu","age":9}
> {"id":8,"name":"maliu","age":8}
> {"id":18,"name":"shi","age":18}
#驗證 是否寫入,消費
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hive_test
{"id":1,"name":"zhangsan","age":100}
{"id":10,"name":"lisi","age":10}
{"id":9,"name":"wangwu","age":9}
{"id":8,"name":"maliu","age":8}
{"id":18,"name":"shi","age":18}
5.3.2 Hive驗證
hive>select * from db_test_tmp.hive_test_sink;
注意:test_kafka_source 無法在hive中查詢FAILED: SemanticException Line 0:-1 Invalid column reference 'TOK_ALLCOLREF'
如有需要可以參考https://blog.csdn.net/Zsigner/article/details/111285065
6、拓展-生成Hive Partition表
6.1 修改source 表及sink表
6.1.1 source 編碼
tableEnv.executeSql("CREATE TABLE db_test_tmp.test_kafka_source (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT,\n" +
" statdate STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka'," +
" 'topic' = 'kafka_test',\n" +
" 'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
" 'properties.group.id' = 'hive_test_09',\n" +
" 'format' = 'json',\n"+
" 'scan.startup.mode' = 'latest-offset')");
6.1.2 sink table 編碼
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test_tmp.test_hive_sink (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") PARTITIONED BY (statdate String) STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$statdate 00:00:00',\n" +
" 'stream-source.consume-order' = 'partition-time',\n" +
" 'stream-source.enable' = 'true',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file'\n" +
")");
6.2 完整程式碼
package com.source.kafka;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
public class HivePartitionTest {
private final static Logger logger = LoggerFactory.getLogger(HivePartitionTest.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
HiveCatalog catalog = new HiveCatalog(
"myhive",
"db_test_tmp",
"/usr/local/service/hive/conf"
);
tableEnv.registerCatalog("myhive", catalog);
tableEnv.useCatalog("myhive");
tableEnv.executeSql("DROP TABLE IF EXISTS db_test_tmp.test_kafka_source");
tableEnv.executeSql("CREATE TABLE db_test_tmp.test_kafka_source (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT,\n" +
" statdate STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka'," +
" 'topic' = 'kafka_test',\n" +
" 'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
" 'properties.group.id' = 'hive_test_09',\n" +
" 'format' = 'json',\n"+
" 'scan.startup.mode' = 'latest-offset')");
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE db_test_tmp.test_hive_sink");
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test_tmp.test_hive_sink (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT\n" +
") PARTITIONED BY (statdate String) STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$statdate 00:00:00',\n" +
" 'stream-source.consume-order' = 'partition-time',\n" +
" 'stream-source.enable' = 'true',\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO db_test_tmp.test_hive_sink SELECT id,name,age,DATE_FORMAT(statdate,'yyyy-MM-dd') FROM test_kafka_source");
}
}
6.3 驗證
6.3.1 Kafka 驗證
kafka 新建topic,寫入資料
#新建topic
./kafka-topics.sh --zookeeper localhost:2181 --topic kafka_test --partitions 1 --replication-factor 1 --create
#寫入資料
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_test
>{"id":1,"name":"zhangsan","age":100,"statdate":"2021-05-09 15:42:34"}
>{"id":10,"name":"lisi","age":10,"statdate":"2021-05-10 15:46:20"}
#驗證 是否寫入,消費
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hive_test
{"id":1,"name":"zhangsan","age":100,"statdate":"2021-05-09 15:42:34"}
{"id":10,"name":"lisi","age":10,"statdate":"2021-05-10 15:46:20"}
6.3.2 Hive驗證
hive> show partitions db_test_tmp.test_hive_sink;
hive> select * from db_test_tmp.test_hive_sink;
加深學習理解: