Flink 環境的搭建、獨立叢集、Flink on Yarn、訪問Flink web介面、Flink提交任務的三種方式、Flink讀取HDFS上的資料
Flink 執行方式
三種執行方式(與spark相似):
1、local 本地測試
2、Standallone Cluster 獨立叢集(做實時計算,不需要hadoop,該獨立叢集可能用的上)
3、Flink on Yarn 推薦
Standallone Cluster 獨立叢集
獨立叢集是不依賴hadoop的,所以可以先停掉 Hadoop
注意:獨立叢集的搭建需要配置 JAVA_HOME 和 免密登入
1、上傳、解壓、配置環境變數
#進入壓縮包所在目錄 cd /usr/local/soft #解壓 tar -zxvf /usr/local/soft/flink-1.11.2-bin-scala_2.11.tgz -C /usr/local/soft/ #配置環境變數 vim /etc/profile #新增 export FLINK_HOME=/usr/local/soft/flink-1.11.2 export PATH=$PATH:$FLINK_HOME/bin #重新整理 source /etc/profile
2、修改配置檔案
在此之前我們先來簡單瞭解一下Flink叢集的架構
vim /usr/local/soft/flink-1.11.2/conf/flink-conf.yaml
# (修改)指定主節點ip地址
jobmanager.rpc.address: master
vim workers
# (修改)指定從節點
node1
node2
vim masters
# 改成主節點master
master:8081
3、同步到所有節點
scp -r /usr/local/soft/flink-1.11.2/ node1:`pwd` scp -r /usr/local/soft/flink-1.11.2/ node2:`pwd`
4、啟動(停止)叢集
Flink叢集的命令都在 bin 目錄下,不記得可以去找
所有主從架構的叢集都是在主節點操作命令
# 啟動
start-cluster.sh
# 停止
stop-cluster.sh
訪問Flink web介面
http://master:8081
Flink提交任務的三種方式
1、在web頁面中提交
以昨天WordCount程式碼為例(程式碼不需要改),將之打成jar包
2、同flink命令提交任務
將jar包上傳至叢集,提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar com.shujia.flink.soure.Demo4ReadKafka -- 主類名 flink-1.0.jar -- jar包名
3、rpc方式提交任務 --- 遠端提交
用的較少
程式碼寫完之後需要先打包,再執行
Flink框架報錯有一個特點:前幾條報錯原因都是廢話,要從後面看
package com.shujia.flink.core
import org.apache.flink.streaming.api.scala._
object Demo2RpcSubmit {
def main(args: Array[String]): Unit = {
/**
* 建立遠端環境,遠端提交flink任務
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
//主機名
"master",
//埠號
8081,
//指定jar包的路徑
"C:\\Users\\qx\\IdeaProjects\\bigdata14\\flink\\target\\flink-1.0.jar"
)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//1、將資料展開
val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
//2、轉換成kv格式
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//3、按照單詞進行分組
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
//4、統計數量,對value進行求和, 指定下標進行聚合
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
//列印結果
countDS.print()
env.execute("rpc")
}
}
Flink on Yarn
只需要部署一個節點 -- 在master中部署即可
需要先將獨立叢集停掉
JAVA_HOME、免密登入、上傳、解壓、配置環境變數 -- 這些配置都要做
1、配置HADOOP_CONF_DIR
export -- 全域性生效
vim /etc/profile
#新增
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/
source /etc/profile
2、將hadoop依賴jar上傳到flink lib目錄
cd /usr/local/soft/flink-1.11.2/lib
#jar包名
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
flink和spark一樣都是粗粒度資源申請
啟動方式
1、yarn-session 在yarn裡面啟動一個flink叢集---- 所有任務共享同一個jobManager(ApplicationMaster)
需要先啟動Hadoop:
start-all.sh
yarn-session是所有任務共享同一個jobmanager
# 在master中
# 開啟
yarn-session.sh -jm 1024m -tm 1096m
# -jm 1024m -- 指定JobManager的記憶體
# -tm 1096m -- 指定TaskManager的記憶體
# 啟動完畢之後會返回一個地址給我們--->Flink叢集的web介面
# http://note01:46386
提交任務的三種方式
在master中輸入
nc -lk 8888
的時候,可能會出現 端口占用檢視埠是否佔用:
ps -aux | grep 8888
如果佔用,將其殺死:
kill -9 編號
# 提交任務
# 任務提交的時候是根據並行度動態申請taskmanager
1、在web頁面提交任務
# 將程式碼程式碼打包好的jar包新增到web頁面
2、同flink命令提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar
3、rpc方式提交任務(遠端提交)
# 遠端提交,需要結合啟動叢集返回的地址---http://node1:46386
# 修改程式碼,建立環境的時候,將master改成node1,將8888改成46386
# 然後將程式碼打包,然後再執行程式碼就行了(不需要我們手動向叢集中新增jar包了)
# 任務結束之後,資源被動態回收
# 關閉/停止 yarn-session
yarn application -kill application_1647657435495_0001
# application_1647657435495_0001 -- yarn-session的yarn上程序號
stop 也能退出
2、直接提交任務到yarn----每一個任務單獨使用一個jobManager
直接提交任務就不需要啟動
yarn-session
如果之前已啟動,需要將其殺死
yarn application -kill
直接提交任務到yarn,一個任務需要執行一次命令,一個任務會有一個 單獨的web頁面
flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar
# -m yarn-cluster -- 指定提交模式
# -yjm 1024m -ytm 1096m -- 指定JobManager和TaskManager的記憶體
# -c com.shujia.flink.core.Demo1WordCount -- 指定主類名
# flink-1.0.jar -- 指定jar包名
# 殺掉yarn上的任務
yarn application -kill application_1599820991153_0005
# 檢視日誌
yarn logs -applicationId application_1647657435495_0002
yarn-session先在yarn中啟動一個jobMansager ,所有的任務共享一個jobmanager (提交任務更快,任務之間共享jobmanager , 相互有影響)
直接提交任務模型,為每一個任務啟動一個joibmanager (每一個任務獨立jobmanager , 任務執行穩定)
Flink讀取HDFS上的資料
package com.shujia.flink.core
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
object Demo3FlinkOnHdfs {
def main(args: Array[String]): Unit = {
//建立flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 讀取hdfs中的資料 -- 有界流
*/
val studentDS: DataStream[String] = env.readTextFile("hdfs://master:9000/data/student")
val clazzNumDS: DataStream[(String, Int)] = studentDS
.map(stu => (stu.split(",")(4), 1))
.keyBy(_._1)
.sum(1)
/**
* 將資料儲存到hdfs(改程式碼可以通過官網檢視,不需要死記)
*/
val sink: StreamingFileSink[(String, Int)] = StreamingFileSink
//指定儲存路徑和資料的編碼格式
.forRowFormat(new Path("hdfs://master:9000/data/flink_clazz"), new SimpleStringEncoder[(String, Int)]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toSeconds(15))
.withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
.withMaxPartSize(1024)
.build())
.build()
clazzNumDS.addSink(sink)
env.execute()
}
}
# 將程式碼打包,然後提交到yarn上執行
# 執行結果可以通過日誌檢視,在web頁面只能檢視無界流的任務執行結果