1. 程式人生 > >flink專案開發-flink的scala shell命令列互動模式開發

flink專案開發-flink的scala shell命令列互動模式開發

flink的 scala shell命令列互動模式開發

flink帶有一個整合的scala shell命令列。它可以以本地方式啟動來模擬叢集叢集。執行下面的命令就可以通過shell命令列和flink叢集互動(這種方式方便於程式碼除錯):

bin/start-scala-shell.sh local

如果想在叢集上面執行scala shell,請檢視本節後面的內容。

flink scala shell 用法

shell方式支援流處理和批處理。當啟動shell命令列之後,兩個不同的ExecutionEnvironments會被自動建立。使用benv和senv分別去處理批處理和流處理程式。(類似於spark-shell中sc變數)

DataSet API

下面的例子將會在scala shell中執行wordcount程式

Scala-Flink> val text = benv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print()命令會自定傳送指定的任務到jobmanager去執行,並且會將結果顯示在控制檯。

也可以吧結果寫到一個檔案中,然而,在這種情況下,你需要執行execute方法,去執行你的程式

Scala-Flink> benv.execute("MyProgram")

DataStream API

類似於上面的批處理程式,我們可以通過DataStream API執行一個流處理程式。

Scala-Flink> val textStreaming = senv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

注意:在流處理情況下,print方法不會觸發執行。需要呼叫execute方法才會真正執行。

flink shell會自動帶有命令執行歷史。

在shell命令列模式下新增外部依賴

可以將外部類路徑新增到scala-shell中,當程式被呼叫的時候,這些外部依賴會自動的被髮送到jobmanager上。

使用這個引數 -a <path/to/jar.jar> 或者 --addclasspath <path/to/jar.jar> 新增額外的依賴。

bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>

flink scala shell設定

檢視scala shell模式提供的選型,可以執行這個命令:

bin/start-scala-shell.sh --help

本地模式local

使用shell連線一個本地整合的flink 叢集 使用下面命令

bin/start-scala-shell.sh local

遠端模式remote

使用scala shell 連線一個遠端叢集,使用host和port引數去連線指定的jobmanager

bin/start-scala-shell.sh remote <hostname> <portnumber>

叢集模式 yarn scala shell cluster

可以通過scala shell在yarn上啟動一個專有的flink叢集,yarn containers的數量可以通過引數-n 指定。shell在yarn上部署了一個新的叢集並且連線到這個叢集。你也可以指定叢集的引數,例如:指定jobmanager的記憶體,yarn application的名稱 等等。

例如:針對scala shell啟動一個yarn叢集包含兩個taskmanager,使用下面的引數:

bin/start-scala-shell.sh yarn -n 2

針對所有的引數選項,可以在本節的最後檢視完整的說明

yarn session模式

如果你之前已經使用flink yarn session模式啟動了一個flink叢集,scala shell可以使用下面的命令進行連線:

bin/start-scala-shell.sh yarn

完整的引數選項

Flink Scala Shell
用法: start-scala-shell.sh [local|remote|yarn] [options] <args>...

命令: local [options]
使用scala shell連線一個本地flink叢集
  -a <path/to/jar> | --addclasspath <path/to/jar>
        指定flink使用的第三方依賴
命令: remote [options] <host> <port>
啟動scala shell連線一個遠端叢集
  <host>
        主機名
  <port>
        埠號

  -a <path/to/jar> | --addclasspath <path/to/jar>
        指定flink使用的第三方依賴
命令: yarn [options]
使用flink連線一個yarn叢集
  -n arg | --container arg
        分配的yarn container的數量 (等於TaskManagers的數量)
  -jm arg | --jobManagerMemory arg
        JobManager container 的記憶體[in MB]
  -nm <value> | --name <value>
        在YARN上給應用設定一個名字
  -qu <arg> | --queue <arg>
        指定YARN佇列
  -s <arg> | --slots <arg>
        指定每個TaskManager的slot數量
  -tm <arg> | --taskManagerMemory <arg>
        TaskManager container的記憶體 [in MB]
  -a <path/to/jar> | --addclasspath <path/to/jar>
        指定flink使用的第三方jar
  --configDir <value>
        配置檔案目錄.
  -h | --help
        列印幫助資訊

獲取更多大資料資料,視訊以及技術交流請加群:

QQ群號1:295505811(已滿)

QQ群號2:54902210

QQ群號3:555684318