Flink安裝、部署、KafkaSource、SinKToMysql
flink安裝、部署、測試
下載flink安裝包
flink下載地址
https://archive.apache.org/dist/flink/flink-1.5.0/
因為例子不需要hadoop,下載flink-1.5.0-bin-scala_2.11.tgz即可
上傳至機器的/opt目錄下
解壓
tar -zxf flink-1.5.0-bin-scala_2.11.tgz -C ../opt/
配置master節點
選擇一個 master節點(JobManager)然後在conf/flink-conf.yaml中設定jobmanager.rpc.address 配置項為該節點的IP 或者主機名。確保所有節點有有一樣的jobmanager.rpc.address 配置。
jobmanager.rpc.address: node1
(配置埠如果被佔用也要改 如預設8080已經被spark佔用,改成了8088)
rest.port: 8088
本次安裝 master節點為node1,因為單機,slave節點也為node1
配置slaves
將所有的 worker 節點 (TaskManager)的IP 或者主機名(一行一個)填入conf/slaves 檔案中。
啟動flink叢集
bin/start-cluster.sh
開啟 http://node1:8088 檢視web頁面
Task Managers代表當前的flink只有一個節點,每個task還有兩個slots
測試
依賴
<groupId>com.rz.flinkdemo</groupId> <artifactId>Flink-programe</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.5.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
測試程式碼
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
final String hostName;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
hostName = params.get("hostname");
} catch (Exception e) {
System.err.println("No port or hostname specified. Please run 'SocketWindowWordCount --port <port> --hostname <hostname>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostName, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
打包mvn clean install (如果打包過程中報錯java.lang.OutOfMemoryError)
在命令列set MAVEN_OPTS= -Xms128m -Xmx512m
繼續執行mvn clean install
生成FlinkTest.jar
找到打成的jar,並upload,開始上傳
執行引數介紹
提交結束之後去overview介面看,可以看到,可用的slots變成了一個,因為我們的socket程式佔用了一個,正在running的job變成了一個
傳送資料
[[email protected] flink-1.5.0]# nc -l 8099
aaa bbb
aaa ccc
aaa bbb
bbb ccc
點開running的job,你可以看見接收的位元組數等資訊
到log目錄下可以清楚的看見輸出
[[email protected] log]# tail -f flink-root-taskexecutor-2-localhost.out
aaa : 1
ccc : 1
ccc : 1
bbb : 1
ccc : 1
bbb : 1
bbb : 1
ccc : 1
bbb : 1
ccc : 1
除了可以在介面提交,還可以將jar上傳的linux中進行提交任務
執行flink上傳的jar
bin/flink run -c com.rz.flinkdemo.SocketWindowWordCount jars/FlinkTest.jar --port 8099 --hostname node1
其他步驟一致。
使用kafka作為source
加上依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.0</version>
</dependency>
public class KakfaSource010 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","node1:9092");
properties.setProperty("group.id","test");
//DataStream<String> test = env.addSource(new FlinkKafkaConsumer010<String>("topic", new SimpleStringSchema(), properties));
//可以通過正則表示式來匹配合適的topic
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>(java.util.regex.Pattern.compile("test-[0-9]"), new SimpleStringSchema(), properties);
//配置從最新的地方開始消費
kafkaSource.setStartFromLatest();
//使用addsource,將kafka的輸入轉變為datastream
DataStream<String> consume = env.addSource(kafkaSource);
...
//process and sink
env.execute("KakfaSource010");
}
}
使用mysql作為sink
flink本身並沒有提供datastream輸出到mysql,需要我們自己去實現
首先,匯入依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
</dependency>
自定義sink,首先想到的是extends SinkFunction,整合flink自帶的sinkfunction,再當中實現方法,實現如下
public class MysqlSink implements
SinkFunction<Tuple2<String,String>> {
private static final long serialVersionUID = 1L;
private Connection connection;
private PreparedStatement preparedStatement;
String username = "mysql.user";
String password = "mysql.password";
String drivername = "mysql.driver";
String dburl = "mysql.url";
@Override
public void invoke(Tuple2<String,String> value) throws Exception {
Class.forName(drivername);
connection = DriverManager.getConnection(dburl, username, password);
String sql = "insert into table(name,nickname) values(?,?)";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, value.f0);
preparedStatement.setString(2, value.f1);
preparedStatement.executeUpdate();
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
}
這樣實現有個問題,每一條資料,都要開啟mysql連線,再關閉,比較耗時,這個可以使用flink中比較好的Rich方式來實現,程式碼如下
public class MysqlSink extends RichSinkFunction<Tuple2<String,String>> {
private Connection connection = null;
private PreparedStatement preparedStatement = null;
private String userName = null;
private String password = null;
private String driverName = null;
private String DBUrl = null;
public MysqlSink() {
userName = "mysql.username";
password = "mysql.password";
driverName = "mysql.driverName";
DBUrl = "mysql.DBUrl";
}
public void invoke(Tuple2<String,String> value) throws Exception {
if(connection==null){
Class.forName(driverName);
connection = DriverManager.getConnection(DBUrl, userName, password);
}
String sql ="insert into table(name,nickname) values(?,?)";
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1,value.f0);
preparedStatement.setString(2,value.f1);
preparedStatement.executeUpdate();//返回成功的話就是一個,否則就是0
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName(driverName);
connection = DriverManager.getConnection(DBUrl, userName, password);
}
@Override
public void close() throws Exception {
if(preparedStatement!=null){
preparedStatement.close();
}
if(connection!=null){
connection.close();
}
}
}
Rich方式的優點在於,有個open和close方法,在初始化的時候建立一次連線,之後一直使用這個連線即可,縮短建立和關閉連線的時間,也可以使用連線池實現,這裡只是提供這樣一種思路。
使用這個mysqlsink也非常簡單
//直接addsink,即可輸出到自定義的mysql中,也可以將mysql的欄位等寫成可配置的,更加方便和通用
proceDataStream.addSink(new MysqlSink());
總結
本次的筆記做了簡單的部署、測試、kafkademo,以及自定義實現mysqlsink的一些內容,其中比較重要的是Rich的使用,希望大家能有所收穫。