1. 程式人生 > 其它 >Flink 環境的搭建、獨立叢集、Flink on Yarn、訪問Flink web介面、Flink提交任務的三種方式、Flink讀取HDFS上的資料

Flink 環境的搭建、獨立叢集、Flink on Yarn、訪問Flink web介面、Flink提交任務的三種方式、Flink讀取HDFS上的資料

Flink 執行方式

三種執行方式(與spark相似):

1、local 本地測試

2、Standallone Cluster 獨立叢集(做實時計算,不需要hadoop,該獨立叢集可能用的上)

3、Flink on Yarn 推薦

Standallone Cluster 獨立叢集

獨立叢集是不依賴hadoop的,所以可以先停掉 Hadoop

注意:獨立叢集的搭建需要配置 JAVA_HOME 和 免密登入

1、上傳、解壓、配置環境變數

#進入壓縮包所在目錄
cd /usr/local/soft
#解壓
tar -zxvf /usr/local/soft/flink-1.11.2-bin-scala_2.11.tgz -C /usr/local/soft/

#配置環境變數
vim /etc/profile
#新增
export FLINK_HOME=/usr/local/soft/flink-1.11.2

export PATH=$PATH:$FLINK_HOME/bin

#重新整理
source /etc/profile

2、修改配置檔案

在此之前我們先來簡單瞭解一下Flink叢集的架構

vim /usr/local/soft/flink-1.11.2/conf/flink-conf.yaml
# (修改)指定主節點ip地址
jobmanager.rpc.address: master

vim workers
# (修改)指定從節點
node1  
node2

vim masters 
# 改成主節點master
master:8081

3、同步到所有節點

scp -r /usr/local/soft/flink-1.11.2/ node1:`pwd`
scp -r /usr/local/soft/flink-1.11.2/ node2:`pwd`

4、啟動(停止)叢集

Flink叢集的命令都在 bin 目錄下,不記得可以去找

所有主從架構的叢集都是在主節點操作命令

# 啟動
start-cluster.sh
# 停止
stop-cluster.sh
http://master:8081

Flink提交任務的三種方式

1、在web頁面中提交

以昨天WordCount程式碼為例(程式碼不需要改),將之打成jar包

2、同flink命令提交任務

將jar包上傳至叢集,提交任務

flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 
com.shujia.flink.soure.Demo4ReadKafka -- 主類名
flink-1.0.jar -- jar包名

3、rpc方式提交任務 --- 遠端提交

用的較少

程式碼寫完之後需要先打包,再執行

Flink框架報錯有一個特點:前幾條報錯原因都是廢話,要從後面看

package com.shujia.flink.core

import org.apache.flink.streaming.api.scala._

object Demo2RpcSubmit {
  def main(args: Array[String]): Unit = {

    /**
      * 建立遠端環境,遠端提交flink任務
      *
      */
      
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
      //主機名
      "master",
      //埠號
      8081,
      //指定jar包的路徑
      "C:\\Users\\qx\\IdeaProjects\\bigdata14\\flink\\target\\flink-1.0.jar" 
    )

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
    //1、將資料展開
    val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
    //2、轉換成kv格式
    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
    //3、按照單詞進行分組
    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
    //4、統計數量,對value進行求和, 指定下標進行聚合
    val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
    //列印結果
    countDS.print()

    env.execute("rpc")

  }
}

只需要部署一個節點 -- 在master中部署即可

需要先將獨立叢集停掉

JAVA_HOME、免密登入、上傳、解壓、配置環境變數 -- 這些配置都要做

1、配置HADOOP_CONF_DIR

export -- 全域性生效

vim /etc/profile
#新增
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/

source /etc/profile
cd /usr/local/soft/flink-1.11.2/lib

#jar包名
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar

flink和spark一樣都是粗粒度資源申請

啟動方式

1、yarn-session 在yarn裡面啟動一個flink叢集---- 所有任務共享同一個jobManager(ApplicationMaster)

需要先啟動Hadoop:start-all.sh

yarn-session是所有任務共享同一個jobmanager

# 在master中
# 開啟
yarn-session.sh -jm 1024m -tm 1096m
# -jm 1024m -- 指定JobManager的記憶體
# -tm 1096m -- 指定TaskManager的記憶體

# 啟動完畢之後會返回一個地址給我們--->Flink叢集的web介面
# http://note01:46386

提交任務的三種方式

在master中輸入nc -lk 8888 的時候,可能會出現 端口占用

檢視埠是否佔用:ps -aux | grep 8888

如果佔用,將其殺死:kill -9 編號

# 提交任務  
# 任務提交的時候是根據並行度動態申請taskmanager

1、在web頁面提交任務
# 將程式碼程式碼打包好的jar包新增到web頁面

2、同flink命令提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 

3、rpc方式提交任務(遠端提交)
# 遠端提交,需要結合啟動叢集返回的地址---http://node1:46386
# 修改程式碼,建立環境的時候,將master改成node1,將8888改成46386
# 然後將程式碼打包,然後再執行程式碼就行了(不需要我們手動向叢集中新增jar包了)

# 任務結束之後,資源被動態回收

# 關閉/停止 yarn-session
yarn application -kill application_1647657435495_0001
# application_1647657435495_0001 -- yarn-session的yarn上程序號

stop 也能退出

2、直接提交任務到yarn----每一個任務單獨使用一個jobManager

直接提交任務就不需要啟動yarn-session

如果之前已啟動,需要將其殺死yarn application -kill

直接提交任務到yarn,一個任務需要執行一次命令,一個任務會有一個 單獨的web頁面

flink run -m yarn-cluster  -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar
# -m yarn-cluster -- 指定提交模式
# -yjm 1024m -ytm 1096m -- 指定JobManager和TaskManager的記憶體
# -c com.shujia.flink.core.Demo1WordCount -- 指定主類名
# flink-1.0.jar -- 指定jar包名

# 殺掉yarn上的任務
yarn application -kill application_1599820991153_0005

# 檢視日誌
yarn logs -applicationId application_1647657435495_0002

yarn-session先在yarn中啟動一個jobMansager ,所有的任務共享一個jobmanager (提交任務更快,任務之間共享jobmanager , 相互有影響)
直接提交任務模型,為每一個任務啟動一個joibmanager (每一個任務獨立jobmanager , 任務執行穩定)

Flink讀取HDFS上的資料

package com.shujia.flink.core

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._

object Demo3FlinkOnHdfs {
  def main(args: Array[String]): Unit = {
      //建立flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     /**
      * 讀取hdfs中的資料  -- 有界流
      */
    val studentDS: DataStream[String] = env.readTextFile("hdfs://master:9000/data/student")

    val clazzNumDS: DataStream[(String, Int)] = studentDS
      .map(stu => (stu.split(",")(4), 1))
      .keyBy(_._1)
      .sum(1)

     /**
      * 將資料儲存到hdfs(改程式碼可以通過官網檢視,不需要死記)
      */
    val sink: StreamingFileSink[(String, Int)] = StreamingFileSink
      //指定儲存路徑和資料的編碼格式
      .forRowFormat(new Path("hdfs://master:9000/data/flink_clazz"), new SimpleStringEncoder[(String, Int)]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toSeconds(15))
          .withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
          .withMaxPartSize(1024)
          .build())
      .build()

    clazzNumDS.addSink(sink)

    env.execute()
  }
}

# 將程式碼打包,然後提交到yarn上執行
# 執行結果可以通過日誌檢視,在web頁面只能檢視無界流的任務執行結果