Flink(一)【基礎入門,Flink on Yarn模式】
目錄
一.介紹
Apache Flink是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。
Spark | Flink
-
spark
處理方式:批處理
延時性:高延遲(採集週期)
缺點:精準一次性消費,錯亂延遲資料,延遲高
-
flink
處理方式:流處理(有界,無界)
延時性:低延遲
優點:①靈活的視窗 ②Exactly Once語義保證
二.快速入門:WC案例
pom依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.10.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <log4j.version>2.12.1</log4j.version> </properties> <!-- flink的依賴 開始 --> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <!-- flink的依賴 結束 --> <!--打包外掛 --> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
批處理
Java程式碼
/** * @description: WordCount 批處理 * @author: HaoWu * @create: 2020年09月15日 */ public class WC_Batch { public static void main(String[] args) throws Exception { // 0.建立執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 1.讀取資料 DataSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input"); // 2.扁平化 ->(word,1) AggregateOperator<Tuple2<String, Integer>> reuslt = fileDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { // 2.1切分 String[] words = s.split(" "); // 2.2轉為二元Tuple for (String word : words) { Tuple2<String, Integer> tuple = Tuple2.of(word, 1); collector.collect(tuple); } } }) // 3.分組 .groupBy(0) // 4.求sum .sum(1); // 3.輸出儲存 reuslt.print(); // 4.啟動 } }
控制檯
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(flink,2)
(hello,4)
(sparksql,1)
(spark,1)
Process finished with exit code 0
流處理
有界流
Java程式碼
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* @description: WordCount 批處理
* @author: HaoWu
* @create: 2020年09月15日
*/
public class WC_Batch {
public static void main(String[] args) throws Exception {
// 0.建立執行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1.讀取資料
DataSource<String> fileDS = env.readTextFile("D:\\SoftWare\\idea-2019.2.3\\wordspace\\13_flinkdemo\\input");
// 2.扁平化 ->(word,1)
AggregateOperator<Tuple2<String, Integer>> reuslt = fileDS.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
// 2.1切分
String[] words = s.split(" ");
// 2.2轉為二元Tuple
for (String word : words) {
Tuple2<String, Integer> tuple = Tuple2.of(word, 1);
collector.collect(tuple);
}
}).returns(new TypeHint<Tuple2<String, Integer>>(){})
// 3.分組
.groupBy(0)
// 4.求sum
.sum(1);
// 5.輸出儲存
reuslt.print();
// 6.執行(批處理不需要啟動)
}
}
控制檯
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
7> (flink,1)
7> (flink,2)
1> (spark,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
3> (hello,4)
3> (sparksql,1)
Process finished with exit code 0
無界流(重要)
Java程式碼
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @description: 無界流(有頭無尾)
* @author: HaoWu
* @create: 2020年09月15日
*/
public class Flink03_WC_UnBoundedStream {
public static void main(String[] args) throws Exception {
// 0.建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.讀取資料
DataStreamSource<String> fileDS = env.socketTextStream("hadoop102", 9999);
// 2.扁平化:轉換(word,1)
SingleOutputStreamOperator<Tuple2<String, Integer>> result = fileDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 2.1切分
String[] words = s.split(" ");
// 2.2收集寫出下游
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}
}) // 3.分組
.keyBy(0)
// 4.求sum
.sum(1);
// 5.輸出
result.print();
// 6.執行
env.execute();
}
}
nc工具 Socket 輸入
[root@hadoop102 ~]$ nc -lk 9999
a
f
d
af
dafda
fafa
a
a
b
a
控制檯
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> (a,1)
2> (f,1)
5> (d,1)
4> (af,1)
7> (dafda,1)
3> (fafa,1)
6> (a,2)
6> (a,3)
2> (b,1)
6> (a,4)
注意
①tuple的兩種寫法:Tuple2.of(word, 1)、new Tuple2<>(word,1)。
②匿名物件、lamda兩種寫法。
③不要導錯包,用java的,別選成scala的了。
三.Yarn模式部署
有多種部署模式,local,standalone,yarn,windows等,本文只介紹yarn。
安裝
前提已經部署hdfs,yarn,解壓即用
將flink-1.10.0-bin-scala_2.11.tgz檔案上傳到Linux並解壓縮,放置在指定位置,路徑中不要包含中文或空格
tar -zxvf flink-1.10.0-bin-scala_2.11.tgz -C /opt/module
打包測試,命令列(無界流)
執行無界流的job,使用nc工具測試,預設提交的模式是Per-job方式。
當前yarn模式不支援webUI方式提交,standalone模式可以用webUI提交。
bin/flink run -m yarn-cluster -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar
FAQ報錯
解決方案
錯誤的原因是Flink1.8版本之後,預設情況下類庫中是不包含hadoop相關依賴的,所以提交時會發生錯誤,,引入hadoop相關依賴jar包即可:flink-shaded-hadoop-2-uber-3.1.3-9.0.jar
上傳後,重新執行上面的指令即可。執行過程可以通過Yarn的應用服務頁面檢視
cp /opt/software/flink/flink-shaded-hadoop-2-uber-3.1.3-9.0.jar /opt/module/flink-1.10.0/lib/
重新執行任務提交
啟動nc工具進行測試
Flink on Yarn
Flink提供了兩種在yarn上的執行模式,分別是Session-Cluster和Per-Job-Cluster模式。
Per-Job-Cluster
在上面的應用程式提交時,一個Job會對應一個yarn-session叢集,每提交一個作業會根據自身的情況,都會單獨向yarn申請資源,直到作業執行完成,一個作業的失敗與否並不會影響下一個作業的正常提交和執行。獨享Dispatcher和ResourceManager,按需接受資源申請;適合規模大長時間執行的作業
通過-m yarn-cluster 引數來指定執行模式
bin/flink run -m yarn-cluster -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar
這種方式每次提交都會建立一個新的flink叢集,任務之間互相獨立,互不影響,方便管理。任務執行完成之後建立的叢集也會消失。
Session-Cluster
在規模小執行時間短的作業執行時,頻繁的申請資源並不是一個好的選擇,所以Flink還提供了一種可以事先申請一定資源,然後在這個資源中並行執行多個作業的叢集方式。
![image-20200915184337978](image/
在yarn中初始化一個flink叢集,開闢指定的資源,以後提交任務都向這裡提交。這個flink叢集會常駐在yarn叢集中,除非手工停止。
Session-Cluster叢集模式和Per-Job-Cluster不一樣的是需要事先建立Yarn應用後再提交Flink應用程式
①建立Yarn應用
bin/yarn-session.sh -d -n 2 -s 2 -jm 1024 -tm 1024 -nm test
注意:flink新的版本 -n
,-s
引數將不再有效,Yarn會按需動態分配資源 。以後不要加這兩個引數了。
webUI可以觀察,當前flink版本1.10
沒有任務,也沒有資源分配
相關引數
引數 | 含義 |
---|---|
守護模式,daemon | |
-n(--container) | TaskManager的數量 |
-s(--slots) | 每個TaskManager的slot數量,預設一個slot一個core,預設每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做冗餘 |
-jm | JobManager的記憶體(單位MB) |
-tm | 每個Taskmanager的記憶體(單位MB) |
-nm | yarn 的appName(現在yarn的ui上的名字) |
-d | 後臺執行,需要放在前面,否則不生效 |
②再提交任務
bin/flink run -c com.flink.chapt01.Flink03_WC_UnBoundedStream /opt/module/testdata/flink-wc.jar
可以發現yarn分配了資源
HA高可用
任何時候都有一個 主 JobManager 和多個備用 JobManagers,以便在主節點失敗時有備用 JobManagers 來接管叢集。這保證了沒有單點故障,一旦備 JobManager 接管叢集,作業就可以正常執行。主備 JobManager 例項之間沒有明顯的區別。每個 JobManager 都可以充當主備節點。
- 配置yarn最大重試次數%HADOOP_HOME%/etc/hadoop/yarn-site.xml,分發檔案
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
- 修改conf/flink-conf.yaml配置檔案
配置引數中冒號後面的引數值都需要增加空格
yarn.application-attempts: 4
# Line79
high-availability: zookeeper
# Line88
high-availability.storageDir: hdfs://hadoop102:9000/flink/ha/
# Line94
high-availability.zookeeper.quorum: hadoop102:2282,hadoop103:2282,hadoop104:2282
- 修改conf/master配置檔案
Hadoop102:8081
Hadoop103:8081
Hadoop104:8081
- 修改zoo.cfg配置檔案
也可以用外部的zk叢集
#Line 32 防止和外部ZK衝突
clientPort=2282
#Line 35
server.88=hadoop102:2888:3888
server.89=hadoop103:2888:3888
server.90=hadoop104:2888:3888
5)分發flink
xsync flink
6)啟動Flink Zookeeper叢集
bin/start-zookeeper-quorum.sh
- 啟動Flink Session應用
bin/yarn-session.sh -d -jm 1024 -tm 1024 -nm test
如果此時將YarnSessionClusterEntrypoint程序關閉,WebUI介面會訪問不了
那麼稍等後,Yarn會自動重新啟動Cluster程序,就可以重新訪問了。