1. 程式人生 > 其它 >Flink流處理測試

Flink流處理測試

Flink流處理測試

package com.shujia.flink.core

import org.apache.flink.streaming.api.scala._

object Demo1WordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 1、建立flink的執行環境
     * 這是flink程式的入口
     */
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * 2、讀取資料
     * DataStream相當於spark中的DStream
     */
    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    /**
     * 3、開啟socket
     * 在虛擬機器中輸入  nc -lk 8888  回車
     */

    //先不做處理,直接列印處理
    //流處理不能使用foreach迴圈列印
    linesDS.print()

    /**
     * 4、啟動flink程式(執行該程式碼)
     */
    env.execute("wordcount")//給該程式起個名字
  }
}

步驟:

  • 1、建立flink的執行環境
  • 2、讀取資料
  • 3、返回虛擬機器中,輸入nc -lk 8888 回車
  • 4、編寫啟動flink程式的程式碼,然後執行整個程式碼

回到虛擬機器中,輸入一些資料,在IDEA中會對應生成;

因為我的電腦效能-邏輯處理器是4,所以在IDEA中的輸出結果並行度編號有4種

Flink處理WordCount時,想要列印日誌

(1)增加依賴

		<dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>${log4j.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>${log4j.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>${log4j.version}</version>
  </dependency>

(2)在IDEA的resources目錄中增加一個配置檔案log4j2.properties

(3)重新執行程式碼

預設並行度是計算機核心數(邏輯處理器)有關,我們通過程式碼可以自定義並行度

//在讀取資料之前設定並行度
env.setParallelism(2)

重新執行程式碼