1. 程式人生 > 實用技巧 >如何刪除win10系統開始選單多餘的資料夾(圖文)

如何刪除win10系統開始選單多餘的資料夾(圖文)

一、安裝 Scala 外掛

Flink 分別提供了基於 Java 語言和 Scala 語言的 API ,如果想要使用 Scala 語言來開發 Flink 程式,可以通過在 IDEA 中安裝 Scala 外掛來提供語法提示,程式碼高亮等功能。開啟 IDEA , 依次點選 File => settings => plugins 開啟外掛安裝頁面,搜尋 Scala 外掛並進行安裝,安裝完成後,重啟 IDEA 即可生效。

二、Flink 專案初始化

2.1 使用官方指令碼構建

Flink 官方支援使用 Maven 和 Gradle 兩種構建工具來構建基於 Java 語言的 Flink 專案;支援使用 SBT 和 Maven 兩種構建工具來構建基於 Scala 語言的 Flink 專案。 這裡以 Maven 為例進行說明,因為其可以同時支援 Java 語言和 Scala 語言專案的構建。需要注意的是 Flink 1.9 只支援 Maven 3.0.4 以上的版本,Maven 安裝完成後,可以通過以下兩種方式來構建專案:

1. 直接基於 Maven Archetype 構建

直接使用下面的 mvn 語句來進行構建,然後根據互動資訊的提示,依次輸入 groupId , artifactId 以及包名等資訊後等待初始化的完成:

$ mvn archetype:generate                               \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0

注:如果想要建立基於 Scala 語言的專案,只需要將 flink-quickstart-java 換成 flink-quickstart-scala 即可,後文亦同。

2. 使用官方指令碼快速構建

為了更方便的初始化專案,官方提供了快速構建指令碼,可以直接通過以下命令來進行呼叫:

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0

該方式其實也是通過執行 maven archetype 命令來進行初始化,其指令碼內容如下:

PACKAGE=quickstart

mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false

可以看到相比於第一種方式,該種方式只是直接指定好了 groupId ,artifactId ,version 等資訊而已。

2.2 使用 IDEA 構建

如果你使用的是開發工具是 IDEA ,可以直接在專案建立頁面選擇 Maven Flink Archetype 進行專案初始化:

如果你的 IDEA 沒有上述 Archetype, 可以通過點選右上角的 ADD ARCHETYPE ,來進行新增,依次填入所需資訊,這些資訊都可以從上述的 archetype:generate 語句中獲取。點選 OK 儲存後,該 Archetype 就會一直存在於你的 IDEA 中,之後每次建立專案時,只需要直接選擇該 Archetype 即可:

選中 Flink Archetype ,然後點選 NEXT 按鈕,之後的所有步驟都和正常的 Maven 工程相同。

三、專案結構

3.1 專案結構

建立完成後的自動生成的專案結構如下:

其中 BatchJob 為批處理的樣例程式碼,原始碼如下:

import org.apache.flink.api.scala._

object BatchJob {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
....
env.execute("Flink Batch Scala API Skeleton")
}
}

getExecutionEnvironment 代表獲取批處理的執行環境,如果是本地執行則獲取到的就是本地的執行環境;如果在叢集上執行,得到的就是叢集的執行環境。如果想要獲取流處理的執行環境,則只需要將 ExecutionEnvironment 替換為 StreamExecutionEnvironment, 對應的程式碼樣例在 StreamingJob 中:

import org.apache.flink.streaming.api.scala._

object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
...
env.execute("Flink Streaming Scala API Skeleton")
}
}

需要注意的是對於流處理專案 env.execute() 這句程式碼是必須的,否則流處理程式就不會被執行,但是對於批處理專案則是可選的。

3.2 主要依賴

基於 Maven 骨架建立的專案主要提供了以下核心依賴:其中 flink-scala 用於支援開發批處理程式 ;flink-streaming-scala 用於支援開發流處理程式 ;scala-library 用於提供 Scala 語言所需要的類庫。如果在使用 Maven 骨架建立時選擇的是 Java 語言,則預設提供的則是 flink-javaflink-streaming-java 依賴。

<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency> <!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

需要特別注意的以上依賴的 scope 標籤全部被標識為 provided ,這意味著這些依賴都不會被打入最終的 JAR 包。因為 Flink 的安裝包中已經提供了這些依賴,位於其 lib 目錄下,名為 flink-dist_*.jar ,它包含了 Flink 的所有核心類和依賴:

scope 標籤被標識為 provided 會導致你在 IDEA 中啟動專案時會丟擲 ClassNotFoundException 異常。基於這個原因,在使用 IDEA 建立專案時還自動生成了以下 profile 配置:

<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id> <activation>
<property>
<name>idea.version</name>
</property>
</activation> <dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>

在 id 為 add-dependencies-for-IDEA 的 profile 中,所有的核心依賴都被標識為 compile,此時你可以無需改動任何程式碼,只需要在 IDEA 的 Maven 面板中勾選該 profile,即可直接在 IDEA 中執行 Flink 專案:

四、詞頻統計案例

專案建立完成後,可以先書寫一個簡單的詞頻統計的案例來嘗試執行 Flink 專案,以下以 Scala 語言為例,分別介紹流處理程式和批處理程式的程式設計示例:

4.1 批處理示例

import org.apache.flink.api.scala._

object WordCountBatch {

  def main(args: Array[String]): Unit = {
val benv = ExecutionEnvironment.getExecutionEnvironment
val dataSet = benv.readTextFile("D:\\wordcount.txt")
dataSet.flatMap { _.toLowerCase.split(",")}
.filter (_.nonEmpty)
.map { (_, 1) }
.groupBy(0)
.sum(1)
.print()
}
}

其中 wordcount.txt 中的內容如下:

a,a,a,a,a
b,b,b
c,c
d,d

本機不需要配置其他任何的 Flink 環境,直接執行 Main 方法即可,結果如下:

4.2 流處理示例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time object WordCountStreaming { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[String] = senv.socketTextStream("192.168.0.229", 9999, '\n')
dataStream.flatMap { line => line.toLowerCase.split(",") }
.filter(_.nonEmpty)
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(3))
.sum(1)
.print()
senv.execute("Streaming WordCount")
}
}

這裡以監聽指定埠號上的內容為例,使用以下命令來開啟埠服務:

nc -lk 9999

之後輸入測試資料即可觀察到流處理程式的處理情況。

五、使用 Scala Shell

對於日常的 Demo 專案,如果你不想頻繁地啟動 IDEA 來觀察測試結果,可以像 Spark 一樣,直接使用 Scala Shell 來執行程式,這對於日常的學習來說,效果更加直觀,也更省時。Flink 安裝包的下載地址如下:

https://flink.apache.org/downloads.html

Flink 大多數版本都提供有 Scala 2.11 和 Scala 2.12 兩個版本的安裝包可供下載:

下載完成後進行解壓即可,Scala Shell 位於安裝目錄的 bin 目錄下,直接使用以下命令即可以本地模式啟動:

./start-scala-shell.sh local

命令列啟動完成後,其已經提供了批處理 (benv 和 btenv)和流處理(senv 和 stenv)的執行環境,可以直接執行 Scala Flink 程式,示例如下:

最後解釋一個常見的異常:這裡我使用的 Flink 版本為 1.9.1,啟動時會丟擲如下異常。這裡因為按照官方的說明,目前所有 Scala 2.12 版本的安裝包暫時都不支援 Scala Shell,所以如果想要使用 Scala Shell,只能選擇 Scala 2.11 版本的安裝包。

[[email protected] bin]# ./start-scala-shell.sh local
錯誤: 找不到或無法載入主類 org.apache.flink.api.scala.FlinkShell

系列傳送門