1. 程式人生 > 實用技巧 >Flink(一)【基礎入門,Flink on Yarn模式】

Flink(一)【基礎入門,Flink on Yarn模式】

目錄

一.介紹

Apache Flink是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。

  • spark

    處理方式:批處理

    延時性:高延遲(採集週期)

    缺點:精準一次性消費,錯亂延遲資料,延遲高

  • flink

    處理方式:流處理(有界,無界)

    延時性:低延遲

    優點:①靈活的視窗 ②Exactly Once語義保證

二.快速入門:WC案例

pom依賴

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>
    <!-- flink的依賴  開始 -->
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <!-- flink的依賴  結束 -->

    <!--打包外掛 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

批處理

Java程式碼

/**
 * @description: WordCount 批處理
 * @author: HaoWu
 * @create: 2020年09月15日
 */
public class WC_Batch {
    public static void main(String[] args) throws Exception {
        // 0.建立執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        DataSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input");
        // 2.扁平化 ->(word,1)
        AggregateOperator<Tuple2<String, Integer>> reuslt = fileDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 2.1切分
                String[] words = s.split(" ");
                // 2.2轉為二元Tuple
                for (String word : words) {
                    Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
                    collector.collect(tuple);
                }
            }
        })      // 3.分組
                .groupBy(0)
                // 4.求sum
                .sum(1);
        // 3.輸出儲存
        reuslt.print();
        // 4.啟動
    }
}

控制檯

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(flink,2)
(hello,4)
(sparksql,1)
(spark,1)

Process finished with exit code 0

流處理

有界流

Java程式碼

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;

/**
 * @description: WordCount 批處理
 * @author: HaoWu
 * @create: 2020年09月15日
 */
public class WC_Batch {
    public static void main(String[] args) throws Exception {
        // 0.建立執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        DataSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input");
        // 2.扁平化 ->(word,1)
        AggregateOperator<Tuple2<String, Integer>> reuslt = fileDS.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
            // 2.1切分
            String[] words = s.split(" ");
            // 2.2轉為二元Tuple
            for (String word : words) {
                Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
                collector.collect(tuple);
            }
        }).returns(new TypeHint<Tuple2<String, Integer>>(){})
                // 3.分組
                .groupBy(0)
                // 4.求sum
                .sum(1);
        // 5.輸出儲存
        reuslt.print();
        // 6.執行(批處理不需要啟動)
    }
}

控制檯

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
7> (flink,1)
7> (flink,2)
1> (spark,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
3> (sparksql,1)

Process finished with exit code 0
無界流(重要)

Java程式碼

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @description: 無界流(有頭無尾)
 * @author: HaoWu
 * @create: 2020年09月15日
 */
public class Flink03_WC_UnBoundedStream {
    public static void main(String[] args) throws Exception {
        // 0.建立執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.讀取資料
        DataStreamSource<String> fileDS = env.socketTextStream("hadoop102", 9999);
        // 2.扁平化:轉換(word,1)
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = fileDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 2.1切分
                String[] words = s.split(" ");
                // 2.2收集寫出下游
                for (String word : words) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        })      // 3.分組
                .keyBy(0)
                // 4.求sum
                .sum(1);
        // 5.輸出
        result.print();
        // 6.執行
        env.execute();

    }
}

nc工具 Socket 輸入

[root@hadoop102 ~]$ nc -lk 9999
a
f
d
af
dafda
fafa
a
a
b
a

控制檯

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> (a,1)
2> (f,1)
5> (d,1)
4> (af,1)
7> (dafda,1)
3> (fafa,1)
6> (a,2)
6> (a,3)
2> (b,1)
6> (a,4)

注意

①tuple的兩種寫法:Tuple2.of(word, 1)、new Tuple2<>(word,1)。

②匿名物件、lamda兩種寫法。

③不要導錯包,用java的,別選成scala的了。

三.Yarn模式部署

有多種部署模式,local,standalone,yarn,windows等,本文只介紹yarn。

安裝

前提已經部署hdfs,yarn,解壓即用

將flink-1.10.0-bin-scala_2.11.tgz檔案上傳到Linux並解壓縮,放置在指定位置,路徑中不要包含中文或空格

tar -zxvf flink-1.10.0-bin-scala_2.11.tgz -C /opt/module

打包測試,命令列(無界流)

執行無界流的job,使用nc工具測試,預設提交的模式是Per-job方式。

當前yarn模式不支援webUI方式提交,standalone模式可以用webUI提交。

bin/flink run -m yarn-cluster -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar

FAQ報錯

解決方案

錯誤的原因是Flink1.8版本之後,預設情況下類庫中是不包含hadoop相關依賴的,所以提交時會發生錯誤,,引入hadoop相關依賴jar包即可:flink-shaded-hadoop-2-uber-3.1.3-9.0.jar

上傳後,重新執行上面的指令即可。執行過程可以通過Yarn的應用服務頁面檢視

cp /opt/software/flink/flink-shaded-hadoop-2-uber-3.1.3-9.0.jar /opt/module/flink-1.10.0/lib/

重新執行任務提交

啟動nc工具進行測試

​ Flink提供了兩種在yarn上的執行模式,分別是Session-Cluster和Per-Job-Cluster模式。

Per-Job-Cluster

在上面的應用程式提交時,一個Job會對應一個yarn-session叢集,每提交一個作業會根據自身的情況,都會單獨向yarn申請資源,直到作業執行完成,一個作業的失敗與否並不會影響下一個作業的正常提交和執行。獨享Dispatcher和ResourceManager,按需接受資源申請;適合規模大長時間執行的作業

通過-m yarn-cluster 引數來指定執行模式

bin/flink run -m yarn-cluster -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar

這種方式每次提交都會建立一個新的flink叢集,任務之間互相獨立,互不影響,方便管理。任務執行完成之後建立的叢集也會消失。

Session-Cluster

​ 在規模小執行時間短的作業執行時,頻繁的申請資源並不是一個好的選擇,所以Flink還提供了一種可以事先申請一定資源,然後在這個資源中並行執行多個作業的叢集方式。

![image-20200915184337978](image/

在yarn中初始化一個flink叢集,開闢指定的資源,以後提交任務都向這裡提交。這個flink叢集會常駐在yarn叢集中,除非手工停止。

Session-Cluster叢集模式和Per-Job-Cluster不一樣的是需要事先建立Yarn應用後再提交Flink應用程式

①建立Yarn應用

bin/yarn-session.sh -d -n 2 -s 2 -jm 1024 -tm 1024 -nm test

注意:flink新的版本 -n,-s 引數將不再有效,Yarn會按需動態分配資源 。以後不要加這兩個引數了。

webUI可以觀察,當前flink版本1.10

沒有任務,也沒有資源分配

相關引數

引數 含義
守護模式,daemon
-n(--container) TaskManager的數量
-s(--slots) 每個TaskManager的slot數量,預設一個slot一個core,預設每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做冗餘
-jm JobManager的記憶體(單位MB)
-tm 每個Taskmanager的記憶體(單位MB)
-nm yarn 的appName(現在yarn的ui上的名字)
-d 後臺執行,需要放在前面,否則不生效

②再提交任務

bin/flink run -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar 

可以發現yarn分配了資源

HA高可用

任何時候都有一個 主 JobManager 和多個備用 JobManagers,以便在主節點失敗時有備用 JobManagers 來接管叢集。這保證了沒有單點故障,一旦備 JobManager 接管叢集,作業就可以正常執行。主備 JobManager 例項之間沒有明顯的區別。每個 JobManager 都可以充當主備節點。

  1. 配置yarn最大重試次數%HADOOP_HOME%/etc/hadoop/yarn-site.xml,分發檔案
<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
</property>
  1. 修改conf/flink-conf.yaml配置檔案

配置引數中冒號後面的引數值都需要增加空格

yarn.application-attempts: 4
# Line79
high-availability: zookeeper

# Line88
high-availability.storageDir: hdfs://hadoop102:9000/flink/ha/

# Line94
high-availability.zookeeper.quorum: hadoop102:2282,hadoop103:2282,hadoop104:2282
  1. 修改conf/master配置檔案
Hadoop102:8081
Hadoop103:8081
Hadoop104:8081
  1. 修改zoo.cfg配置檔案

也可以用外部的zk叢集

#Line 32 防止和外部ZK衝突
clientPort=2282
#Line 35
server.88=hadoop102:2888:3888
server.89=hadoop103:2888:3888
server.90=hadoop104:2888:3888

5)分發flink

xsync flink

6)啟動Flink Zookeeper叢集

bin/start-zookeeper-quorum.sh
  1. 啟動Flink Session應用
bin/yarn-session.sh -d -jm 1024 -tm 1024 -nm test

如果此時將YarnSessionClusterEntrypoint程序關閉,WebUI介面會訪問不了

那麼稍等後,Yarn會自動重新啟動Cluster程序,就可以重新訪問了。