1. 程式人生 > >利用Flink stream從kafka中寫資料到mysql

利用Flink stream從kafka中寫資料到mysql

眼看到年底了,許久也沒更新部落格,最近也比較清閒,順帶學習了下大資料框架Flink,  這框架跟Spark類似,上手比較容易,使用的話兩三天就可以開始寫程式碼,在有些方面比spark要強,比如說流處理,下面就用flink中的Stream從kafka中讀取資料寫入到mysql中,廢話不多說,具體上程式碼吧:

首先看配置檔案:

#mysql
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://localhost:3306/mybatis
mysql.user=root
#kafka
kafka.topic=mytopic
kafka.hosts=localhost:9092
kafka.zookper=localhost:2181
kafka.group=group

maven依賴情況:
<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.10</artifactId>
			<version>1.1.3</version>
		</dependency>
		<!-- Use this dependency if you are using the DataSet API -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>1.1.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.10</artifactId>
			<version>1.1.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table_2.10</artifactId>
			<version>1.1.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.nd4j/nd4j-api -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
			<version>1.1.3</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.17</version>
		</dependency>

傳送資料到kafka的類:
package com.wanda.flink;
import java.util.Properties;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import com.wanda.common.Config;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducte {
	public static void main(String[] args) throws InterruptedException {
		String broker = Config.getString("kafka.hosts");
		System.out.println("broker:" + broker);
		String topic = Config.getString("kafka.topic");
		int count = 10;
		Random random=new Random();
		Properties props = new Properties();
		props.put("metadata.broker.list", broker);
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("request.required.acks", "1");
		ProducerConfig pConfig = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(
				pConfig);
		for (int i = 0; i < count; ++i) {
			String josn=random.nextInt(10)+":"+RandomStringUtils.randomAlphabetic(3)+":"+random.nextInt(1000);
			producer.send(new KeyedMessage<String, String>(topic, josn));
			Thread.sleep(1000);
			System.out.println("第"+i+"條資料已經發送");
		}
	}
}
重寫RichSinkFunction,用於寫入到mysql中:
package com.wanda.flink;

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import com.wanda.common.Config;

public class MySQLSink extends
		RichSinkFunction<Tuple3<Integer, String, Integer>> {

	private static final long serialVersionUID = 1L;
	private Connection connection;
	private PreparedStatement preparedStatement;
	String username = Config.getString("mysql.user");
	String password = "";
	String drivername = Config.getString("mysql.driver");
	String dburl = Config.getString("mysql.url");

	@Override
	public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
		Class.forName(drivername);
		connection = DriverManager.getConnection(dburl, username, password);
		String sql = "replace into orders(order_id,order_no,order_price) values(?,?,?)";
		preparedStatement = connection.prepareStatement(sql);
		preparedStatement.setInt(1, value.f0);
		preparedStatement.setString(2, value.f1);
		preparedStatement.setInt(3, value.f2);
		preparedStatement.executeUpdate();
		if (preparedStatement != null) {
			preparedStatement.close();
		}
		if (connection != null) {
			connection.close();
		}

	}

}


最後的用啟動Flink中流計算類,用於寫入資料到mysql中去:

package com.wanda.flink;

import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.wanda.common.Config;
public class KafkaToDB {
	public static void main(String[] args) throws Exception {
		Properties pro = new Properties();
		pro.put("bootstrap.servers", Config.getString("kafka.hosts"));
		pro.put("zookeeper.connect", Config.getString("kafka.zookper"));
		pro.put("group.id", Config.getString("kafka.group"));
		StreamExecutionEnvironment env = StreamExecutionEnvironment
				.getExecutionEnvironment();
		env.getConfig().disableSysoutLogging();  //設定此可以遮蔽掉日記列印情況
		env.getConfig().setRestartStrategy(
				RestartStrategies.fixedDelayRestart(4, 10000));
		env.enableCheckpointing(5000);
		DataStream<String> sourceStream = env
				.addSource(new FlinkKafkaConsumer08<String>(Config
						.getString("kafka.topic"), new SimpleStringSchema(),
						pro));

		DataStream<Tuple3<Integer, String, Integer>> sourceStreamTra = sourceStream.filter(new FilterFunction<String>() {	
			@Override
			public boolean filter(String value) throws Exception {
				return StringUtils.isNotBlank(value);
			}
		}).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
			private static final long serialVersionUID = 1L;
					@Override
					public Tuple3<Integer, String, Integer> map(String value)
							throws Exception {
						String[] args = value.split(":");
						return new Tuple3<Integer, String, Integer>(Integer
								.valueOf(args[0]), args[1],Integer
								.valueOf(args[2]));
					}
				});
		
		sourceStreamTra.addSink(new MySQLSink());
		env.execute("data to mysql start");
	}
}