Flink流處理測試
阿新 • • 發佈:2022-03-20
Flink流處理測試
package com.shujia.flink.core import org.apache.flink.streaming.api.scala._ object Demo1WordCount { def main(args: Array[String]): Unit = { /** * 1、建立flink的執行環境 * 這是flink程式的入口 */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 2、讀取資料 * DataStream相當於spark中的DStream */ val linesDS: DataStream[String] = env.socketTextStream("master", 8888) /** * 3、開啟socket * 在虛擬機器中輸入 nc -lk 8888 回車 */ //先不做處理,直接列印處理 //流處理不能使用foreach迴圈列印 linesDS.print() /** * 4、啟動flink程式(執行該程式碼) */ env.execute("wordcount")//給該程式起個名字 } }
步驟:
- 1、建立flink的執行環境
- 2、讀取資料
- 3、返回虛擬機器中,輸入
nc -lk 8888
回車 - 4、編寫啟動flink程式的程式碼,然後執行整個程式碼
回到虛擬機器中,輸入一些資料,在IDEA中會對應生成;
因為我的電腦效能-邏輯處理器是4,所以在IDEA中的輸出結果並行度
編號有4種
Flink處理WordCount時,想要列印日誌
(1)增加依賴
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency>
(2)在IDEA的
resources
目錄中增加一個配置檔案log4j2.properties
(3)重新執行程式碼
預設並行度
是計算機核心數(邏輯處理器)有關,我們通過程式碼可以自定義並行度
//在讀取資料之前設定並行度
env.setParallelism(2)
重新執行程式碼