1. 程式人生 > 其它 >idea搭建flink環境

idea搭建flink環境

1.開啟idea,選擇new project,跳轉至如下介面:

2.單擊next 下一步,輸入專案名稱,單擊finish完成

3.在專案main專案下新建scala目錄,並新建檔案,檔名稱為 hello.scala

4.新建檔案之後,進入helllo.scala檔案,會彈出新增sdk,新增自己版本的scala就可以。完成之後可以新建新增如下程式碼,測試Java和scala之間的互相呼叫。

5.在maven專案下引入如下配置:

<properties>
    <maven.compiler.source>1.8</maven.compiler.source
> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <flink.version>1.6.1</flink.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies>

6.新建一個flink測試:

package it.bigdata.flink.study

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.api.scala._

//流處理 word count
object SteamWordCount {
  def main(args: Array[String]): Unit = {
    //建立流處理的執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(8)

    //接受一個socket文字流
    val inputDataStream: DataStream[String] = env.socketTextStream("10.18.35.155", 777)
//        進行轉換處理統計
        inputDataStream
          .flatMap(_.split(" "))
          .filter(_.nonEmpty)
          .map((_,1))
          .keyBy(0)
          .sum(1)
          .print()
    env.execute("word count")
  }
}

6.1問題一

若出現StreamExecutionEnvironment 類沒有發現時,將此處,改為compile

6.2問題二

若還報錯,java.lang.ClassNotFoundException: org.apache.flink.runtime.state.StateBackend

開啟edit configuration ,在此處勾選即可

7. 提前進入伺服器,使用nc-lk 777,然後執行第一個flink,執行效果如下