基於Flink的實時離線資料整合
阿新 • • 發佈:2021-10-28
目錄
提供離線資料與實時資料整合功能,支撐實時資料應用,實現離線資料和實時資料的關聯計算。
提供離線資料與實時資料整合功能,支撐實時資料應用,實現離線資料和實時資料的關聯計算。
通過廣播變數將離線資料廣播,資料儲存在記憶體中,通過connect方法獲取廣播流資料,實現與實時資料的高效率整合,廣播流資料隨離線資料進行更新。
KafkaWithBroadcast
import bean.TestBean; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import sink.MysqlSink; import utils.FlinkUtils; import source.MysqlSource; public class KafkaWithBroadcast { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = FlinkUtils.getEnv(); DataStreamSource<Tuple2<String, String>> tps = env.addSource(new MysqlSource()); // 定義廣播State描述器 // code,name MapStateDescriptor<String, String> codeToNameDescriptor = new MapStateDescriptor<>( "code-name", Types.STRING, TypeInformation.of(new TypeHint<String>() {})); // 呼叫broadcast方法將資料廣播 BroadcastStream<Tuple2<String, String>> broadcastDataStream = tps.broadcast(codeToNameDescriptor); // 從kafka中讀取資料 // id,code DataStream<String> kafkaStream = FlinkUtils.createKafkaStream("KWBDemo", SimpleStringSchema.class); // 實時流資料處理 SingleOutputStreamOperator<TestBean> BeanDataStream = kafkaStream.map(new MapFunction<String, TestBean>() { @Override public TestBean map(String line) throws Exception { //切分整理 String[] fields = line.split(","); String id = fields[0]; String code = fields[1]; return new TestBean(id, code); } }); // kafka資料和廣播state進行connect SingleOutputStreamOperator<TestBean> stream = BeanDataStream.connect(broadcastDataStream).process(new BroadcastProcessFunction<TestBean, Tuple2<String, String>, TestBean>() { // id,code,name @Override public void processElement(TestBean value, ReadOnlyContext ctx, Collector<TestBean> out) throws Exception { ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(codeToNameDescriptor); String code = value.getCode(); //根據code獲取name String name = broadcastState.get(code); value.setName(name); out.collect(value); } @Override public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<TestBean> out) throws Exception { // int index = getRuntimeContext().getIndexOfThisSubtask(); // System.out.println("廣播變數更新 --> " + index + " " + value); BroadcastState<String, String> broadcastState = ctx.getBroadcastState(codeToNameDescriptor); //儲存或覆蓋規則資料 broadcastState.put(value.f0, value.f1); } }); // stream.print(); stream.addSink(new MysqlSink()); env.execute("KafkaWithBroadcast"); } }
FlinkUtils
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class FlinkUtils { private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static <T> DataStream<T> createKafkaStream(String src_topic, Class<? extends DeserializationSchema<T>> clazz) throws Exception{ //開啟ChackPointing,同時開啟重啟策略 env.enableCheckpointing(5000); //取消任務checkpoint不刪除 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); Properties props = new Properties(); //指定Kafka的Broker地址 props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092"); //指定組ID props.setProperty("group.id", "test"); //如果沒有記錄偏移量,第一次從最開始消費 props.setProperty("auto.offset.reset", "earliest"); //kafka的消費者不自動提交偏移量 props.setProperty("enable.auto.commit", "false"); String topic = src_topic; //KafkaSource FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>( topic, clazz.newInstance(), props); return env.addSource(kafkaConsumer); } public static StreamExecutionEnvironment getEnv() { return env; } }
MysqlSource
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class MysqlSource extends RichSourceFunction<Tuple2<String, String>> { private boolean flag = true; private String lastCode = null; private Connection connection; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://192.168.1.101:3306/test?characterEncoding=UTF-8", "root", "root"); } @Override public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { while (flag) { String sql = "SELECT code, name FROM dcde WHERE code > ? ORDER BY code DESC"; PreparedStatement prepareStatement = connection.prepareStatement(sql); prepareStatement.setString(1, lastCode != null ? lastCode : "0"); ResultSet resultSet = prepareStatement.executeQuery(); int index = 0; while (resultSet.next()) { String code = resultSet.getString("code"); String name = resultSet.getString("name"); //最新的資料,以後根據最大的code作為查詢條件 if(index == 0) { lastCode = code; } index++; // System.out.println("監控到新的離線資料 --> " + lastCode); ctx.collect(Tuple2.of(code, name)); } resultSet.close(); prepareStatement.close(); Thread.sleep(30000); } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { if(connection != null) { connection.close(); } } }
MysqlSink
import bean.TestBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlSink extends RichSinkFunction<TestBean> {
private transient Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//建立MySQL連線
connection = DriverManager.getConnection("jdbc:mysql://192.168.1.101:3306/test?characterEncoding=UTF-8", "root", "root");
}
@Override
public void invoke(TestBean bean, Context context) throws Exception {
PreparedStatement pstm = null;
try {
pstm = connection.prepareStatement(
"INSERT INTO dapp (id, code, name) VALUES (?, ?, ?)");
pstm.setString(1, bean.getId());
pstm.setString(2, bean.getCode());
pstm.setString(3, bean.getName());
pstm.executeUpdate();
} finally {
if(pstm != null) {
pstm.close();
}
}
}
@Override
public void close() throws Exception {
super.close();
//關閉連線
connection.close();
}
}
TestBean
public class TestBean {
private String id;
private String code;
private String name;
public TestBean() {
}
public TestBean(String id, String code) {
this.id = id;
this.code = code;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "TestBean{" + "id='" + id + '\'' + ", code='" + code + '\'' + ", name='" + name + '\'' + '}';
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flinkdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- Redis的Connector -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-client</artifactId>-->
<!-- <version>2.7.7</version>-->
<!-- </dependency>-->
<!-- 傳統的阻塞的HttpClient -->
<!-- <dependency>-->
<!-- <groupId>org.apache.httpcomponents</groupId>-->
<!-- <artifactId>httpclient</artifactId>-->
<!-- <version>4.5.6</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.57</version>
</dependency>
<!-- 高效的非同步HttpClient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.4</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>test.KafkaWithBroadcast</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>