flink專案開發-flink的scala shell命令列互動模式開發
flink的 scala shell命令列互動模式開發
flink帶有一個整合的scala shell命令列。它可以以本地方式啟動來模擬叢集叢集。執行下面的命令就可以通過shell命令列和flink叢集互動(這種方式方便於程式碼除錯):
bin/start-scala-shell.sh local
如果想在叢集上面執行scala shell,請檢視本節後面的內容。
flink scala shell 用法
shell方式支援流處理和批處理。當啟動shell命令列之後,兩個不同的ExecutionEnvironments會被自動建立。使用benv和senv分別去處理批處理和流處理程式。(類似於spark-shell中sc變數)
DataSet API
下面的例子將會在scala shell中執行wordcount程式
Scala-Flink> val text = benv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val counts = text .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.groupBy(0).sum(1) Scala-Flink> counts.print()
print()命令會自定傳送指定的任務到jobmanager去執行,並且會將結果顯示在控制檯。
也可以吧結果寫到一個檔案中,然而,在這種情況下,你需要執行execute方法,去執行你的程式
Scala-Flink> benv.execute("MyProgram")
DataStream API
類似於上面的批處理程式,我們可以通過DataStream API執行一個流處理程式。
Scala-Flink> val textStreaming = senv.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") Scala-Flink> val countsStreaming = textStreaming .flatMap { _.toLowerCase.split("\\W+") } .map { (_, 1) }.keyBy(0).sum(1) Scala-Flink> countsStreaming.print() Scala-Flink> senv.execute("Streaming Wordcount")
注意:在流處理情況下,print方法不會觸發執行。需要呼叫execute方法才會真正執行。
flink shell會自動帶有命令執行歷史。
在shell命令列模式下新增外部依賴
可以將外部類路徑新增到scala-shell中,當程式被呼叫的時候,這些外部依賴會自動的被髮送到jobmanager上。
使用這個引數 -a <path/to/jar.jar> 或者 --addclasspath <path/to/jar.jar> 新增額外的依賴。
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
flink scala shell設定
檢視scala shell模式提供的選型,可以執行這個命令:
bin/start-scala-shell.sh --help
本地模式local
使用shell連線一個本地整合的flink 叢集 使用下面命令
bin/start-scala-shell.sh local
遠端模式remote
使用scala shell 連線一個遠端叢集,使用host和port引數去連線指定的jobmanager
bin/start-scala-shell.sh remote <hostname> <portnumber>
叢集模式 yarn scala shell cluster
可以通過scala shell在yarn上啟動一個專有的flink叢集,yarn containers的數量可以通過引數-n 指定。shell在yarn上部署了一個新的叢集並且連線到這個叢集。你也可以指定叢集的引數,例如:指定jobmanager的記憶體,yarn application的名稱 等等。
例如:針對scala shell啟動一個yarn叢集包含兩個taskmanager,使用下面的引數:
bin/start-scala-shell.sh yarn -n 2
針對所有的引數選項,可以在本節的最後檢視完整的說明
yarn session模式
如果你之前已經使用flink yarn session模式啟動了一個flink叢集,scala shell可以使用下面的命令進行連線:
bin/start-scala-shell.sh yarn
完整的引數選項
Flink Scala Shell
用法: start-scala-shell.sh [local|remote|yarn] [options] <args>...
命令: local [options]
使用scala shell連線一個本地flink叢集
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方依賴
命令: remote [options] <host> <port>
啟動scala shell連線一個遠端叢集
<host>
主機名
<port>
埠號
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方依賴
命令: yarn [options]
使用flink連線一個yarn叢集
-n arg | --container arg
分配的yarn container的數量 (等於TaskManagers的數量)
-jm arg | --jobManagerMemory arg
JobManager container 的記憶體[in MB]
-nm <value> | --name <value>
在YARN上給應用設定一個名字
-qu <arg> | --queue <arg>
指定YARN佇列
-s <arg> | --slots <arg>
指定每個TaskManager的slot數量
-tm <arg> | --taskManagerMemory <arg>
TaskManager container的記憶體 [in MB]
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方jar
--configDir <value>
配置檔案目錄.
-h | --help
列印幫助資訊
獲取更多大資料資料,視訊以及技術交流請加群:
QQ群號1:295505811(已滿)
QQ群號2:54902210
QQ群號3:555684318