構建一個flink程式,從kafka讀取然後寫入MYSQL
最近flink已經變得比較流行了,所以大家要了解flink並且使用flink。現在最流行的實時計算應該就是flink了,它具有了流計算和批處理功能。它可以處理有界資料和無界資料,也就是可以處理永遠生產的資料。具體的細節我們不討論,我們直接搭建一個flink功能。總體的思路是source -> transform -> sink,即從source獲取相應的資料來源,然後進行資料轉換,將資料從比較亂的格式,轉換成我們需要的格式,轉換處理後,然後進行sink功能,也就是將資料寫入到相應的db裡邊或檔案中用於儲存和展現。
接下來我們需要下載flink,kafka,mysql, zookeeper, 我直接下載了tar或tgz包,然後解壓。
下載好flink之後,然後啟動一下,比如我下載了flink-1.9.1-bin-scala_2.11.tgz,然後解壓一下。
tar -zxvf flink-1.9.1-bin-scala_2.11.tgz cd flink-1.9.1 ./bin/start-cluster.sh
啟動好了之後訪問 http://localhost:8081 ,會看到截圖。
下載zookeeper,解壓之後,複製zookeeper/conf下的zoo_sample.cfg, 然後啟動,命令如下,zookeeper是和kafka結合使用的,因為kafka要監聽和發現所有broker的。
cp zoo_sample.cfg zoo.cfg cd ../ ./bin/zkServer.sh
接下來下載kafka和啟動, 建立一個person的topic,由一個partition和一個備份構成。
./bin/kafka-server-start.sh config/server.properties ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person
mysql的話,大家可以自行安裝了,安裝好之後可以在資料庫裡建立一張表。
CREATE TABLE `Person` ( `id` mediumint NOT NULL auto_increment, `name` varchar(255) NOT NULL, `age` int(11) DEFAULT NULL, `createDate` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
接下來我們該建立一個JAVA工程,採用的maven的方式。前提是大家一定要先安裝好maven,可以執行mvn命令。直接執行一下maven的時候可能會卡住,下載不了,我先從
http://repo.maven.apache.org/maven2/上下載一個 archetype-catalog.xml 檔案,然後放到本地的maven對應的庫,你們可以參考這個我的路徑進行調整。 /Users/huangqingshi/.m2/repository/org/apache/maven/archetype/archetype-catalog/2.4mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.7.2 \ -DgroupId=flink-project \ -DartifactId=flink-project \ -Dversion=0.1 \ -Dpackage=myflink \ -DinteractiveMode=false #這個是建立專案時採用互動方式,上邊指定了了相關的版本號和包名等資訊,所以不需要互動方式進行。 -DarchetypeCatalog=local #這個是使用上邊下載的檔案,local也就是從本地檔案獲取,因為遠端獲取特別慢。導致工程生成不了。
我的這個專案添加了一些依賴比如kafka的,資料庫連線等,具體的pom檔案內容為:
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>flink-project</groupId> <artifactId>flink-project</artifactId> <version>0.1</version> <packaging>jar</packaging> <name>Flink Quickstart Job</name> <url>http://www.myorganization.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.7.2</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.1-jre</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.20</version> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>myflink.StreamingJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <pluginManagement> <plugins> <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> <plugin> <groupId>org.eclipse.m2e</groupId> <artifactId>lifecycle-mapping</artifactId> <version>1.0.0</version> <configuration> <lifecycleMappingMetadata> <pluginExecutions> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <versionRange>[3.0.0,)</versionRange> <goals> <goal>shade</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> <pluginExecution> <pluginExecutionFilter> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <versionRange>[3.1,)</versionRange> <goals> <goal>testCompile</goal> <goal>compile</goal> </goals> </pluginExecutionFilter> <action> <ignore/> </action> </pluginExecution> </pluginExecutions> </lifecycleMappingMetadata> </configuration> </plugin> </plugins> </pluginManagement> </build> <!-- This profile helps to make things run out of the box in IntelliJ --> <!-- Its adds Flink's core classes to the runtime class path. --> <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> <profiles> <profile> <id>add-dependencies-for-IDEA</id> <activation> <property> <name>idea.version</name> </property> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> </dependencies> </profile> </profiles> </project>
接下來,建立一個POJO物件用於儲存資料等操作。
package myflink.pojo; import java.util.Date; /** * @author huangqingshi * @Date 2019-12-07 */ public class Person { private String name; private int age; private Date createDate; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Date getCreateDate() { return createDate; } public void setCreateDate(Date createDate) { this.createDate = createDate; } }
建立一個寫入kafka的任務,用於將資料寫入到kafka。
package myflink.kafka; import com.alibaba.fastjson.JSON; import myflink.pojo.Person; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2019-12-07 */ public class KafkaWriter { //本地的kafka機器列表 public static final String BROKER_LIST = "localhost:9092"; //kafka的topic public static final String TOPIC_PERSON = "PERSON"; //key序列化的方式,採用字串的形式 public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; //value的序列化的方式 public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static void writeToKafka() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", BROKER_LIST); props.put("key.serializer", KEY_SERIALIZER); props.put("value.serializer", VALUE_SERIALIZER); KafkaProducer<String, String> producer = new KafkaProducer<>(props); //構建Person物件,在name為hqs後邊加個隨機數 int randomInt = RandomUtils.nextInt(1, 100000); Person person = new Person(); person.setName("hqs" + randomInt); person.setAge(randomInt); person.setCreateDate(new Date()); //轉換成JSON String personJson = JSON.toJSONString(person); //包裝成kafka傳送的記錄 ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_PERSON, null, null, personJson); //傳送到快取 producer.send(record); System.out.println("向kafka傳送資料:" + personJson); //立即傳送 producer.flush(); } public static void main(String[] args) { while(true) { try { //每三秒寫一條資料 TimeUnit.SECONDS.sleep(3); writeToKafka(); } catch (Exception e) { e.printStackTrace(); } } } }
建立一個數據庫連線的工具類,用於連線資料庫。使用Druid工具,然後放入具體的Driver,Url,資料庫使用者名稱和密碼,初始化連線數,最大活動連線數,最小空閒連線數也就是資料庫連線池,建立好之後返回需要的連線。
package myflink.db;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
/**
* @author huangqingshi
* @Date 2019-12-07
*/
public class DbUtils {
private static DruidDataSource dataSource;
public static Connection getConnection() throws Exception {
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/testdb");
dataSource.setUsername("root");
dataSource.setPassword("root");
//設定初始化連線數,最大連線數,最小閒置數
dataSource.setInitialSize(10);
dataSource.setMaxActive(50);
dataSource.setMinIdle(5);
//返回連線
return dataSource.getConnection();
}
}
接下來建立一個MySQLSink,繼承RichSinkFunction類。過載裡邊的open、invoke、close方法,在執行資料sink之前先執行open方法,然後開始呼叫invoke, 呼叫完成後最後執行close方法。也就是先在open裡邊建立資料庫連線,建立好之後進行呼叫invoke,執行具體的資料庫寫入程式,執行完所有的資料庫寫入程式之後,最後沒有資料之後會呼叫close方法,將資料庫連線資源進行關閉和釋放。具體參考如下程式碼。
package myflink.sink; import myflink.db.DbUtils; import myflink.pojo.Person; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.Timestamp; import java.util.List; /** * @author huangqingshi * @Date 2019-12-07 */ public class MySqlSink extends RichSinkFunction<List<Person>> { private PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //獲取資料庫連線,準備寫入資料庫 connection = DbUtils.getConnection(); String sql = "insert into person(name, age, createDate) values (?, ?, ?); "; ps = connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //關閉並釋放資源 if(connection != null) { connection.close(); } if(ps != null) { ps.close(); } } @Override public void invoke(List<Person> persons, Context context) throws Exception { for(Person person : persons) { ps.setString(1, person.getName()); ps.setInt(2, person.getAge()); ps.setTimestamp(3, new Timestamp(person.getCreateDate().getTime())); ps.addBatch(); } //一次性寫入 int[] count = ps.executeBatch(); System.out.println("成功寫入Mysql數量:" + count.length); } }
建立從kafka讀取資料的source,然後sink到資料庫。配置連線kafka所需要的環境,然後從kafka裡邊讀取資料然後transform成Person物件,這個就是上邊所說的transform。收集5秒鐘視窗從kafka獲取的所有資料,最後sink到MySQL資料庫。
package myflink; import com.alibaba.fastjson.JSONObject; import myflink.kafka.KafkaWriter; import myflink.pojo.Person; import myflink.sink.MySqlSink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector; import java.util.List; import java.util.Properties; /** * @author huangqingshi * @Date 2019-12-07 */ public class DataSourceFromKafka { public static void main(String[] args) throws Exception{ //構建流執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka Properties prop = new Properties(); prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST); prop.put("zookeeper.connect", "localhost:2181"); prop.put("group.id", KafkaWriter.TOPIC_PERSON); prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER); prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER); prop.put("auto.offset.reset", "latest"); DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<String>( KafkaWriter.TOPIC_PERSON, new SimpleStringSchema(), prop )). //單執行緒列印,控制檯不亂序,不影響結果 setParallelism(1); //從kafka裡讀取資料,轉換成Person物件 DataStream<Person> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, Person.class)); //收集5秒鐘的總數 dataStream.timeWindowAll(Time.seconds(5L)). apply(new AllWindowFunction<Person, List<Person>, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable<Person> iterable, Collector<List<Person>> out) throws Exception { List<Person> persons = Lists.newArrayList(iterable); if(persons.size() > 0) { System.out.println("5秒的總共收到的條數:" + persons.size()); out.collect(persons); } } }) //sink 到資料庫 .addSink(new MySqlSink()); //列印到控制檯 //.print(); env.execute("kafka 消費任務開始"); } }
一切準備就緒,然後執行KafkaWriter的main方法往kafka的person主題裡邊寫入資料。看到日誌說明已經寫入成功了。
執行DataSourceFromKafka的main方法從kafka讀取資料,然後寫入資料庫,提示如下:
然後查詢資料庫,資料庫裡邊寫入資料庫了, 說明成功了。
完工啦, 如果有什麼地方不對的地方歡迎指出。