1. 程式人生 > >開發第一個Flink應用

開發第一個Flink應用

《Flink1.7從安裝到體驗》一文中,我們安裝和體驗了Flink,今天就用java來一起開發一個簡單的Flink應用;

步驟列表

本次實戰經歷以下步驟:

  1. 建立應用;
  2. 編碼;
  3. 構建;
  4. 提交任務到Flink,驗證功能;

環境資訊

  1. Flink:1.7;
  2. Flink所在機器的作業系統:CentOS Linux release 7.5.1804;
  3. 開發環境JDK:1.8.0_181;
  4. 開發環境Maven:3.5.0;

應用功能簡介

《Flink1.7從安裝到體驗》一文中,我們在Flink執行SocketWindowWordCount.jar

,實現的功能是從socket讀取字串,將其中的每個單詞的數量統計出來,今天我們就來編碼開發這個應用,實現此功能;

建立應用

  1. 應用基本程式碼是通過mvn命令建立的,在命令列輸入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  1. 按控制檯的提示輸入groupId、artifactId、version、package等資訊,一路回車確認後,會生成一個和你輸入的artifactId同名的資料夾,裡面是個maven工程:
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': socketwordcountdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: socketwordcountdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry
  1. 用IEDA匯入這個maven工程,如下圖,已經有了兩個類:BatchJob和StreamingJob,BatchJob是用於批處理的,本次實戰用不上,因此可以刪除,只保留流處理的StreamingJob:
    在這裡插入圖片描述

應用建立成功,接下來可以開始編碼了;

編碼

您可以選擇直接從GitHub下載這個工程的原始碼,地址和連結資訊如下表所示:

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議

這個git專案中有多個資料夾,本章原始碼在socketwordcountdemo這個資料夾下,如下圖紅框所示:
在這裡插入圖片描述

接下來開始編碼:

  1. 在StreamingJob類中新增靜態內部類WordWithCount,這是個PoJo,用來儲存一個具體的單詞及其出現頻率:
	 /**
	 * 記錄單詞及其出現頻率的Pojo
	 */
	public static class WordWithCount {
		/**
		 * 單詞內容
		 */
		public String word;

		/**
		 * 出現頻率
		 */
		public long count;

		public WordWithCount() {
			super();
		}

		public WordWithCount(String word, long count) {
			this.word = word;
			this.count = count;
		}

		/**
		 * 將單詞內容和頻率展示出來
		 * @return
		 */
		@Override
		public String toString() {
			return word + " : " + count;
		}
	}
  1. 把所有業務邏輯寫在StreamJob類的main方法中,如下所示,關鍵位置都加了中文註釋:
public static void main(String[] args) throws Exception {

		//環境資訊
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		//資料來源是本機9999埠,換行符分隔,您也可以考慮將hostname和port引數通過main方法的入參傳入
		DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");

		//通過text物件轉換得到新的DataStream物件,
		//轉換邏輯是分隔每個字串,取得的所有單詞都建立一個WordWithCount物件
		DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
			@Override
			public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
				for(String word : s.split("\\s")){
					collector.collect(new WordWithCount(word, 1L));
				}
			}
		})
		.keyBy("word")//key為word欄位
		.timeWindow(Time.seconds(5))	//五秒一次的翻滾時間視窗
		.reduce(new ReduceFunction<WordWithCount>() { //reduce策略
			@Override
			public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
				return new WordWithCount(a.word, a.count+b.count);
			}
		});


		//單執行緒輸出結果
		windowCounts.print().setParallelism(1);

		// 執行
		env.execute("Flink Streaming Java API Skeleton");
	}

構建

  1. 在pom.xml檔案所在目錄下執行命令:
mvn clean package -U
  1. 命令執行完畢後,在target目錄下的socketwordcountdemo-1.0-SNAPSHOT.jar檔案就是構建成功的jar包;

在Flink驗證

  1. Flink的安裝和啟動請參考《Flink1.7從安裝到體驗》
  2. 登入到Flink所在機器,執行以下命令:
nc -l 9999
  1. 我這邊Flink所在機器的IP地址是192.168.1.103,因此用瀏覽器訪問的Flink的web地址為:http://192.168.1.103:8081
  2. 選擇剛剛生成的jar檔案作為一個新的任務,如下圖:
    在這裡插入圖片描述
  3. 點選下圖紅框中的"upload",將檔案提交:
    在這裡插入圖片描述
  4. 目前還只是將jar檔案上傳了而已,接下來就是手工設定執行類並啟動任務,操作如下圖,紅框2中填寫的前面編寫的StreamingJob類的完整名稱:
    在這裡插入圖片描述
  5. 提交後的頁面效果如下圖所示,可見一個job已經在執行中了:
    在這裡插入圖片描述
  6. 回到Flink所在機器的控制檯,在之前輸入了nc -l 9999的視窗輸入一些英文句子,然後按下回車鍵,例如:
[[email protected] flink-1.7.0]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vostro.
Starting taskexecutor daemon on host vostro.
[[email protected] flink-1.7.0]# nc -l 9999
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

  1. 接下來看看我們的job的執行效果,如下圖,點選左側的"Task Managers",在右邊的列表中只有一個Task,點選它:
    在這裡插入圖片描述
  2. 出現的頁面有三個tab頁,點選"Stdout"這個tab,就能見到我們的任務對之前句子中的單詞的統計結果,如下圖:
    在這裡插入圖片描述

至此,第一個最簡單Flink就完成了。