1. 程式人生 > 其它 >dedecms清空所有文章怎麼操作?sql語句如何寫?

dedecms清空所有文章怎麼操作?sql語句如何寫?

Flink安裝部署與快速入門

學習目標

l 瞭解Flink誕生背景

瞭解Flink的應用場景

掌握Flink環境的搭建

掌握Flink的入門案例

l 瞭解Flink的基本原理

 

1.  課程說明

 

1.1  課程安排

整個Flink框架課程分為如下九個章節:

第一部分-Flink-安裝部署與快速入門.docx

第二部分-Flink-批處理API.docx

第三部分-Flink-流處理API.docx

第四部分-Flink-高階API.docx

第五部分-Flink-Table與SQL.docx

第六部分-Flink-Action綜合案例.docx

第七部分-Flink高階特性和新特性.docx

第八部分-Flink多語言開發.docx

第九部分-Flink效能調優.docx

注意:Flink框架基礎課主要講解Flink的基本原理和基礎使用,更多的高階的用法,如和其他框架的整合,專案中的使用技巧、FlinkCEP等知識點會在後續的大專案中進行更多的加強學習!

 

 

1.2  框架版本

https://flink.apache.org/blog/

 

 

 

 

 

https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

 

 

 

本課程基於2020年12月10最新發布的Flink1.12.0版本進行講解

Flink1.12.0可以稱得上是一個里程碑版本,由近 300 位開發者參與貢獻者,提交了超過 1000 多個修復或優化。這些修改極大地提高了 Flink 的可用性,並且簡化(且統一)了 Flink 的整個 API 棧。

ü Flink1.12.0其中一些比較重要的修改包括:

ü  DataStream API 上添加了高效的批執行模式的支援。這是批處理和流處理實現真正統一的執行時的一個重要里程碑。

ü 實現了基於Kubernetes的高可用性(HA)方案,作為生產環境中,ZooKeeper方案之外的另外一種選擇。

ü 擴充套件了 Kafka SQL connector,使其可以在 upsert 模式下工作,並且支援在 SQL DDL 中處理 connector 的 metadata。現在,時態表 Join 可以完全用 SQL 來表示,不再依賴於 Table API 了。

ü PyFlink 中添加了對於 DataStream API 的支援,將 PyFlink 擴充套件到了更復雜的場景,比如需要對狀態或者定時器 timer 進行細粒度控制的場景。除此之外,現在原生支援將 PyFlink 作業部署到 Kubernetes上。

 

 

1.3  程式語言

Flink官方提供了Java、Scala、Python語言介面用以開發Flink應用程式,但是Flink的原始碼是使用Java語言進行開發的,且Flink被阿里收購後,未來的主要程式語言都一直會是Java(因為阿里是Java重度使用者!),且GitHub上關於Flink的專案,大多數是使用Java語言編寫的。所以課程中以Java語言為主進行Flink的學習講解,但會擴充套件講解使用其他語言進行Flink開發

https://ci.apache.org/projects/flink/flink-docs-release-1.12/

 

 

 

 

https://github.com/search?q=Flink

 

 

 

 

2.  乘風破浪的Flink-Flink概述

2.1  實時即未來

 

 

 

如今的我們正生活在新一次的資訊革命浪潮中,5G、物聯網、智慧城市、工業4.0、新基建……等新名詞層出不窮,唯一不變的就是變化!對於我們所學習的大資料來說更是這樣:資料產生的越來越快、資料量越來越大,資料的來源越來越千變萬化,資料中隱藏的價值規律更是越來越被重視!數字化時代的未來正在被我們創造!

歷史的發展從來不會一帆風順,隨著大資料時代的發展,海量資料和多種業務的實時處理需求激增,比如:實時監控報警系統、實時風控系統、實時推薦系統等,傳統的批處理方式和早期的流式處理框架因其自身的侷限性,難以在延遲性、吞吐量、容錯能力,以及使用便捷性等方面滿足業務日益苛刻的要求。在這種形勢下,Flink 以其獨特的天然流式計算特性和更為先進的架構設計,極大地改善了以前的流式處理框架所存在的問題。

 

擴充套件閱讀:為什麼說流處理即未來?

https://news.qudong.com/article/562521.shtml

 

 

 

 

 

2.2  一切從Apache開始

 

 

 

Flink 誕生於歐洲的一個大資料研究專案 StratoSphere。該專案是柏林工業大學的一個研究性專案。早期, Flink 是做 Batch 計算的,但是在 2014 年, StratoSphere 裡面的核心成員孵化出 Flink,同年將 Flink 捐贈 Apache,並在後來成為 Apache 的頂級大資料專案,同時 Flink 計算的主流方向被定位為 Streaming 即用流式計算來做所有大資料的計算,這就是 Flink 技術誕生的背景。

2014 年 Flink 作為主攻流計算的大資料引擎開始在開源大資料行業內嶄露頭角。區別於 Storm、Spark Streaming 以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高階的功能。比如它提供了有狀態的計算,支援狀態管理,支援強一致性的資料語義以及支援 基於Event Time的WaterMark對延遲或亂序的資料進行處理等

 

2.3  富二代Flink

 

 

 

https://blog.csdn.net/dQCFKyQDXYm3F8rB0/article/details/86117374

 

隨著人工智慧時代的降臨,資料量的爆發,在典型的大資料的業務場景下資料業務最通用的做

法是:選用批處理的技術處理全量資料,採用流式計算處理實時增量資料。在絕大多數的業務場景之下,使用者的業務邏輯在批處理和流處理之中往往是相同的。但是,使用者用於批處理和流處理的兩套計算引擎是不同的。因此,使用者通常需要寫兩套程式碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品資料處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大資料引擎技術,使用者只需要根據自己的業務邏輯開發一套程式碼。這樣在各種不同的場景下,不管是全量資料還是增量資料,亦或者實時處理,一套方案即可全部支援,這就是阿里選擇 Flink 的背景和初衷。

2015 年阿里巴巴開始使用 Flink 並持續貢獻社群(阿里內部還基於Flink做了一套Blink),2019年1月8日,阿里巴巴以 9000 萬歐元(7億元人民幣)收購了創業公司 Data Artisans。從此Flink開始了新一輪的乘風破浪!

 

 

 

 

2.4  Flink官方介紹

官網地址:

 https://flink.apache.org/

 

 

2.5  Flink元件棧

一個計算框架要有長遠的發展,必須打造一個完整的 Stack。只有上層有了具體的應用,並能很好的發揮計算框架本身的優勢,那麼這個計算框架才能吸引更多的資源,才會更快的進步。所以 Flink 也在努力構建自己的 Stack。

Flink分層的元件棧如下圖所示:每一層所包含的元件都提供了特定的抽象,用來服務於上層元件。

 

 

 

 

 

 

各層詳細介紹:

物理部署層:Flink 支援本地執行、能在獨立叢集或者在被 YARN 管理的叢集上執行, 也能部署在雲上,該層主要涉及Flink的部署模式,目前Flink支援多種部署模式:本地、叢集(Standalone、YARN)、雲(GCE/EC2)、Kubenetes。Flink能夠通過該層能夠支援不同平臺的部署,使用者可以根據需要選擇使用對應的部署模式。

n Runtime核心層:Runtime層提供了支援Flink計算的全部核心實現,為上層API層提供基礎服務,該層主要負責對上層不同介面提供基礎服務,也是Flink分散式計算框架的核心實現層,支援分散式Stream作業的執行、JobGraph到ExecutionGraph的對映轉換、任務排程等。將DataSteam和DataSet轉成統一的可執行的Task Operator,達到在流式引擎下同時處理批量計算和流式計算的目的。

n API&Libraries層:Flink 首先支援了 Scala 和 Java 的 API,Python 也正在測試中。DataStream、DataSet、Table、SQL API,作為分散式資料處理框架,Flink同時提供了支撐計算和批計算的介面,兩者都提供給使用者豐富的資料處理高階API,例如Map、FlatMap操作等,也提供比較低階的Process Function API,使用者可以直接操作狀態和時間等底層資料。

擴充套件庫:Flink 還包括用於複雜事件處理的CEP,機器學習庫FlinkML,圖處理庫Gelly等。Table 是一種介面化的 SQL 支援,也就是 API 支援(DSL),而不是文字化的SQL 解析和執行。

 

2.6  Flink基石

Flink之所以能這麼流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。

 

 

 

l Checkpoint

這是Flink最重要的一個特性。

Flink基於Chandy-Lamport演算法實現了一個分散式的一致性的快照,從而提供了一致性的語義。

Chandy-Lamport演算法實際上在1985年的時候已經被提出來,但並沒有被很廣泛的應用,而Flink則把這個演算法發揚光大了。

Spark最近在實現Continue streaming,Continue streaming的目的是為了降低處理的延時,其也需要提供這種一致性的語義,最終也採用了Chandy-Lamport這個演算法,說明Chandy-Lamport演算法在業界得到了一定的肯定。

https://zhuanlan.zhihu.com/p/53482103

l State

提供了一致性的語義之後,Flink為了讓使用者在程式設計時能夠更輕鬆、更容易地去管理狀態,還提供了一套非常簡單明瞭的State API,包括裡面的有ValueState、ListState、MapState,近期添加了BroadcastState,使用State API能夠自動享受到這種一致性的語義。

l Time

除此之外,Flink還實現了Watermark的機制,能夠支援基於事件的時間的處理,能夠容忍遲到/亂序的資料。

l Window

另外流計算中一般在對流資料進行操作之前都會先進行開窗,即基於一個什麼樣的視窗上做這個計算。Flink提供了開箱即用的各種視窗,比如滑動視窗、滾動視窗、會話視窗以及非常靈活的自定義的視窗。

 

 

2.7  Flink用武之地

http://www.liaojiayi.com/flink-IoT/

https://flink.apache.org/zh/usecases.html

 

 

 

 

從很多公司的應用案例發現,其實Flink主要用在如下三大場景:

2.7.1  Event-driven Applications【事件驅動】

事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取資料,並根據到來的事件觸發計算、狀態更新或其他外部動作。

事件驅動型應用是在計算儲存分離的傳統應用基礎上進化而來。

在傳統架構中,應用需要讀寫遠端事務型資料庫。

相反,事件驅動型應用是基於狀態化流處理來完成。在該設計中,資料和計算不會分離,應用只需訪問本地(記憶體或磁碟)即可獲取資料。

系統容錯性的實現依賴於定期向遠端持久化儲存寫入 checkpoint。下圖描述了傳統應用和事件驅動型應用架構的區別。

 

 

 

從某種程度上來說,所有的實時的資料處理或者是流式資料處理都應該是屬於Data Driven,流計算本質上是Data Driven 計算。應用較多的如風控系統,當風控系統需要處理各種各樣複雜的規則時,Data Driven 就會把處理的規則和邏輯寫入到Datastream API 或者是ProcessFunction API 中,然後將邏輯抽象到整個Flink 引擎,當外面的資料流或者是事件進入就會觸發相應的規則,這就是Data Driven 的原理。在觸發某些規則後,Data Driven 會進行處理或者是進行預警,這些預警會發到下游產生業務通知,這是Data Driven 的應用場景,Data Driven 在應用上更多應用於複雜事件的處理。

 

典型例項

- 欺詐檢測(Fraud detection)

- 異常檢測(Anomaly detection)

- 基於規則的告警(Rule-based alerting)

- 業務流程監控(Business process monitoring)

- Web應用程式(社交網路)

 

 

 

 

2.7.2  Data Analytics Applications【資料分析】

資料分析任務需要從原始資料中提取有價值的資訊和指標。

如下圖所示,Apache Flink 同時支援流式及批量分析應用。

 

 

 

 

 

Data Analytics Applications包含Batch analytics(批處理分析)Streaming analytics(流處理分析)

Batch analytics可以理解為週期性查詢:Batch Analytics 就是傳統意義上使用類似於Map ReduceHiveSpark Batch 等,對作業進行分析、處理、生成離線報表比如Flink應用凌晨從Recorded Events中讀取昨天的資料,然後做週期查詢運算,最後將資料寫入Database或者HDFS,或者直接將資料生成報表供公司上層領導決策使用。

Streaming analytics可以理解為連續性查詢:比如實時展示雙十一天貓銷售GMV(Gross Merchandise Volume成交總額),使用者下單資料需要實時寫入訊息佇列,Flink 應用源源不斷讀取資料做實時計算,然後不斷的將資料更新至Database或者K-VStore,最後做大屏實時展示。

 

典型例項

- 電信網路質量監控

- 移動應用中的產品更新及實驗評估分析

- 消費者技術中的實時資料即席分析

- 大規模圖分析

 

 

2.7.3  Data Pipeline Applications【資料管道】

什麼是資料管道?

提取-轉換-載入(ETL)是一種在儲存系統之間進行資料轉換和遷移的常用方法。

ETL 作業通常會週期性地觸發,將資料從事務型資料庫拷貝到分析型資料庫或資料倉庫。

資料管道和 ETL 作業的用途相似,都可以轉換、豐富資料,並將其從某個儲存系統移動到另一個。

但資料管道是以持續流模式執行,而非週期性觸發。

因此資料管道支援從一個不斷生成資料的源頭讀取記錄,並將它們以低延遲移動到終點。

例如:資料管道可以用來監控檔案系統目錄中的新檔案,並將其資料寫入事件日誌;另一個應用可能會將事件流物化到資料庫或增量構建和優化查詢索引。

和週期性 ETL 作業相比,持續資料管道可以明顯降低將資料移動到目的端的延遲。

此外,由於它能夠持續消費和傳送資料,因此用途更廣,支援用例更多。

下圖描述了週期性ETL作業和持續資料管道的差異。

 

 

 

 

Periodic ETL:比如每天凌晨週期性的啟動一個Flink ETL Job,讀取傳統資料庫中的資料,然後做ETL,最後寫入資料庫和檔案系統。

Data Pipeline:比如啟動一個Flink 實時應用,資料來源(比如資料庫、Kafka)中的資料不斷的通過Flink Data Pipeline流入或者追加到資料倉庫(資料庫或者檔案系統),或者Kafka訊息佇列。

Data Pipeline 的核心場景類似於資料搬運並在搬運的過程中進行部分資料清洗或者處理,而整個業務架構圖的左邊是Periodic ETL,它提供了流式ETL 或者實時ETL,能夠訂閱訊息佇列的訊息並進行處理,清洗完成後實時寫入到下游的DatabaseFile system 中。

典型例項

- 電子商務中的持續 ETL(實時數倉)

當下遊要構建實時數倉時,上游則可能需要實時的Stream ETL。這個過程會進行實時清洗或擴充套件資料,清洗完成後寫入到下游的實時數倉的整個鏈路中,可保證資料查詢的時效性,形成實時資料採集、實時資料處理以及下游的實時Query

- 電子商務中的實時查詢索引構建(搜尋引擎推薦)

搜尋引擎這塊以淘寶為例,當賣家上線新商品時,後臺會實時產生訊息流,該訊息流經過Flink 系統時會進行資料的處理、擴充套件。然後將處理及擴充套件後的資料生成實時索引,寫入到搜尋引擎中。這樣當淘寶賣家上線新商品時,能在秒級或者分鐘級實現搜尋引擎的搜尋。

2.8  擴充套件閱讀:Flink發展現狀

2.8.1  Flink在全球

Flink近年來逐步被人們所熟知,不僅是因為Flink提供同時支援高吞吐/低延遲和Exactly-Once語義的實時計算能力,同時Flink還提供了基於流式計算引擎處理批量資料的計算能力,真正意義上實現批流統一

同時隨著阿里對Blink的開源,極大地增強了Flink對批計算領域的支援.眾多優秀的特性,使得Flink成為開源大資料處理框架中的一顆新星,隨著國內社群的不斷推動,越來越多的公司開始選擇使用Flink作為實時資料處理技術,在不久的將來,Flink也將會成為企業內部主流的資料處理框架,最終成為下一代大資料處理的標準.

 

 

 

2.8.2  Flink在中國

Flink在很多公司的生產環境中得到了使用, 例如: ebay, 騰訊, 阿里, 亞馬遜, 華為等

 

 

 

2.8.3  Flink在阿里

阿里自15年起開始調研開源流計算引擎,最終決定基於Flink打造新一代計算引擎,阿里貢獻了數百個commiter,並對Flink進行高度定製,並取名為Blink,

阿里是Flink SQL的最大貢獻者,一半以上的功能都是阿里的工程師開發的,基於Apache Flink在阿里巴巴搭建的平臺於2016年正式上線,並從阿里巴巴的搜尋和推薦這兩大場景開始實現。

2019年Flink的母公司被阿里7億元全資收購阿里一直致力於Flink在國內的推廣使用目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平臺。

同時Flink計算平臺執行在開源的Hadoop叢集之上採用HadoopYARN做為資源管理排程,以 HDFS作為資料儲存。因此,Flink可以和開源大資料軟體Hadoop無縫對接。

目前,這套基於Flink搭建的實時計算平臺不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支援。

主要包含四個模組:實時監控、實時報表、流資料分析和實時倉庫。

實時監控:

- 使用者行為預警、app crash 預警、伺服器攻擊預警

- 對使用者行為或者相關事件進行實時監測和分析,基於風控規則進行預警、複雜事件處理

實時報表:

- 11、雙12等活動直播大屏

- 對外資料產品:生意參謀等

- 資料化運營

流資料分析:

- 實時計算相關指標反饋及時調整決策

- 內容投放、無線智慧推送、實時個性化推薦等

實時倉庫/ETL

- 資料實時清洗、歸併、結構化

- 數倉的補充和優化

 

Flink在阿里巴巴的大規模應用表現如何?

- 規模:一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百臺伺服器,目前規模已達上萬臺,此等規模在全球範圍內也是屈指可數;

- 狀態資料:基於Flink,內部積累起來的狀態資料已經是PB級別規模;

- Events:如今每天在Flink的計算平臺上,處理的資料已經超過十萬億條;

- TPS:在峰值期間可以承擔每秒超過17億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;

 

 

 

 

 

 

 

2.8.4  Flink在騰訊

https://blog.csdn.net/qianshangding0708/article/details/91469978

 

 

 

 

 

 

2.8.5  Flink在美團

http://ju.outofmemory.cn/entry/367345

https://tech.meituan.com/2018/10/18/meishi-data-flink.html

 

 

 

 

 

2.9  擴充套件閱讀:為什麼選擇Flink?

 

 

 

l 主要原因

1. Flink 具備統一的框架處理有界和無界兩種資料流的能力

2. 部署靈活,Flink 底層支援多種資源排程器,包括Yarn、Kubernetes 等。Flink 自身帶的Standalone 的排程器,在部署上也十分靈活。

3. 極高的可伸縮性,可伸縮性對於分散式系統十分重要,阿里巴巴雙11大屏採用Flink 處理海量資料,使用過程中測得Flink 峰值可達17 億條/秒。

4. 極致的流式處理效能Flink 相對於Storm 最大的特點是將狀態語義完全抽象到框架中,支援本地狀態讀取,避免了大量網路IO,可以極大提升狀態存取的效能。

 

其他更多的原因:

1. 同時支援高吞吐、低延遲、高效能

Flink 是目前開源社群中唯一一套集高吞吐、低延遲、高效能三者於一身的分散式流式資料處理框架。

Spark 只能兼顧高吞吐和高效能特性,無法做到低延遲保障,因為Spark是用批處理來做流處理

Storm 只能支援低延時和高效能特性,無法滿足高吞吐的要求

下圖顯示了 Apache Flink 與 Apache Storm 在完成流資料清洗的分散式任務的效能對比。

 

 

 

 

2. 支援事件時間(Event Time)概念

在流式計算領域中,視窗計算的地位舉足輕重,但目前大多數框架視窗計算採用的都是系統時間(Process Time),也就是事件傳輸到計算框架處理時,系統主機的當前時間。

Flink 能夠支援基於事件時間(Event Time)語義進行視窗計算

這種基於事件驅動的機制使得事件即使亂序到達甚至延遲到達,流系統也能夠計算出精確的結果,保持了事件原本產生時的時序性,儘可能避免網路傳輸或硬體系統的影響。

 

 

 

 

3. 支援有狀態計算

Flink1.4開始支援有狀態計算

所謂狀態就是在流式計算過程中將運算元的中間結果儲存在記憶體或者檔案系統中,等下一個事件進入運算元後可以從之前的狀態中獲取中間結果,計算當前的結果,從而無須每次都基於全部的原始資料來統計結果,極大的提升了系統性能,狀態化意味著應用可以維護隨著時間推移已經產生的資料聚合

 

 

 

 

4. 支援高度靈活的視窗(Window)操作

Flink 將視窗劃分為基於 Time 、Count 、Session、以及Data-Driven等型別的視窗操作,視窗可以用靈活的觸發條件定製化來達到對複雜的流傳輸模式的支援,使用者可以定義不同的視窗觸發機制來滿足不同的需求

 

5. 基於輕量級分散式快照(Snapshot/Checkpoints)的容錯機制

Flink 能夠分佈執行在上千個節點上,通過基於分散式快照技術的Checkpoints,將執行過程中的狀態資訊進行持久化儲存,一旦任務出現異常停止,Flink 能夠從 Checkpoints 中進行任務的自動恢復,以確保資料處理過程中的一致性

Flink 的容錯能力是輕量級的,允許系統保持高併發,同時在相同時間內提供強一致性保證。

 

 

 

6. 基於 JVM 實現的獨立的記憶體管理

Flink 實現了自身管理記憶體的機制,通過使用雜湊,索引,快取和排序有效地進行記憶體管理,通過序列化/反序列化機制將所有的資料物件轉換成二進位制在記憶體中儲存,降低資料儲存大小的同時,更加有效的利用空間。使其獨立於 Java 的預設垃圾收集器,儘可能減少 JVM GC 對系統的影響。

 

 

 

 

7. SavePoints 儲存點

對於 7 * 24 小時執行的流式應用,資料來源源不斷的流入,在一段時間內應用的終止有可能導致資料的丟失或者計算結果的不準確。

比如叢集版本的升級,停機運維操作等。

值得一提的是,Flink 通過SavePoints 技術將任務執行的快照儲存在儲存介質上,當任務重啟的時候,可以從事先儲存的 SavePoints 恢復原有的計算狀態,使得任務繼續按照停機之前的狀態執行。

Flink 儲存點提供了一個狀態化的版本機制,使得能以無丟失狀態和最短停機時間的方式更新應用或者回退歷史資料。

 

 

8. 靈活的部署方式,支援大規模叢集

Flink 被設計成能用上千個點在大規模叢集上執行

除了支援獨立叢集部署外,Flink 還支援 YARN 和Mesos 方式部署。

 

9. Flink 的程式內在是並行和分散式的

資料流可以被分割槽成 stream partitions,

operators 被劃分為operator subtasks;

這些 subtasks 在不同的機器或容器中分不同的執行緒獨立執行;

operator subtasks 的數量就是operator的平行計算數,不同的 operator 階段可能有不同的並行數;

如下圖所示,source operator 的並行數為 2,但最後的 sink operator 為1;

 

 

 

 

10. 豐富的庫

Flink 擁有豐富的庫來進行機器學習,圖形處理,關係資料處理等。

2.10  擴充套件閱讀:大資料框架發展史

 

 

 

這幾年大資料的飛速發展,出現了很多熱門的開源社群,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有著各自專注的應用場景。Spark 掀開了記憶體計算的先河,也以記憶體為賭注,贏得了記憶體計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分散式計算的系統身影。就像 Flink,也就在這個時候默默的發展著。

在國外一些社群,有很多人將大資料的計算引擎分成了 4 代,當然,也有很多人不會認同。我們先姑且這麼認為和討論。

1代——Hadoop MapReduce

首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。它將計算分為兩個階段,分別為 Map 和 Reduce。對於上層應用來說,就不得不想方設法去拆分演算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的演算法,例如迭代計算。

n 批處理

n Mapper、Reducer

2代——DAG框架(Tez) + MapReduce

由於這樣的弊端,催生了支援 DAG 框架的產生。因此,支援 DAG 的框架被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。這裡我們不去細究各種 DAG 實現之間的區別,不過對於當時的 Tez 和 Oozie 來說,大多還是批處理的任務。

n 批處理

n 1個Tez = MR(1) + MR(2) + ... + MR(n)

相比MR效率有所提升

 

 

 

3代——Spark

接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 內部的 DAG 支援(不跨越 Job),以及強調的實時計算。在這裡,很多人也會認為第三代計算引擎也能夠很好的執行批處理的 Job。

批處理、流處理、SQL高層API支援

自帶DAG

n 記憶體迭代計算、效能較之前大幅提升

4代——Flink

隨著第三代計算引擎的出現,促進了上層應用快速發展,例如各種迭代計算的效能以及對流計算和 SQL 等的支援。Flink 的誕生就被歸在了第四代。這應該主要表現在 Flink 對流計算的支援,以及更一步的實時性上面。當然 Flink 也可以支援 Batch 的任務,以及 DAG 的運算。

批處理、流處理、SQL高層API支援

自帶DAG

n 流式計算效能更高、可靠性更高

 

2.11  擴充套件閱讀:流處理 VS 批處理

l 資料的時效性

日常工作中,我們一般會先把資料儲存在,然後對錶的資料進行加工、分析既然先儲存在表中,那就會涉及到時效性概念。

如果我們處理以年,月為單位的級別的資料處理,進行統計分析,個性化推薦,那麼資料的的最新日期離當前有幾個甚至上月都沒有問題。但是如果我們處理的是以天為級別,或者小時甚至更小粒度的資料處理,那麼就要求資料的時效性更高了。比如:

對網站的實時監控

對異常日誌的監控

這些場景需要工作人員立即響應,這樣的場景下,傳統的統一收集資料,再存到資料庫中,再取出來進行分析就無法滿足高時效性的需求了。

 

 

 

l 流式計算和批量計算

 

Batch Analytics,右邊是 Streaming Analytics。批量計算: 統一收集資料->儲存到DB->對資料進行批量處理,就是傳統意義上使用類似於 Map Reduce、Hive、Spark Batch 等,對作業進行分析、處理、生成離線報表

Streaming Analytics 流式計算,顧名思義,就是對資料流進行處理使用流式分析引擎如 Storm,Flink 實時處理分析資料,應用較多的場景如實時大屏、實時報表。

 

它們的主要區別是:

與批量計算那樣慢慢積累資料不同,流式計算立刻計算,資料持續流動,計算完之後就丟棄。

批量計算是維護一張表,對錶進行實施各種計算邏輯。流式計算相反,是必須先定義好計算邏輯,提交到流式計算系統,這個計算作業邏輯在整個執行期間是不可更改的。

計算結果上,批量計算對全部資料進行計算後傳輸結果,流式計算是每次小批量計算後,結果可以立刻實時化展現。

 

2.12  擴充套件閱讀:流批統一

在大資料處理領域,批處理任務與流處理任務一般被認為是兩種不同的任務,一個大資料框架一般會被設計為只能處理其中一種任務:

MapReduce只支援批處理任務;

Storm只支援流處理任務;

Spark Streaming採用micro-batch架構,本質上還是基於Spark批處理對流式資料進行處理

Flink通過靈活的執行引擎,能夠同時支援批處理任務與流處理任務

 

 

 

在執行引擎這一層,流處理系統與批處理系統最大不同在於節點間的資料傳輸方式:

1.對於一個流處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,然後立刻通過網路傳輸到下一個節點,由下一個節點繼續處理

2.對於一個批處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,並不會立刻通過網路傳輸到下一個節點,當快取寫滿,就持久化到本地硬碟上,當所有資料都被處理完成後,才開始將處理後的資料通過網路傳輸到下一個節點

這兩種資料傳輸模式是兩個極端,對應的是流處理系統對低延遲的要求和批處理系統對高吞吐量的要求

 

Flink的執行引擎採用了一種十分靈活的方式,同時支援了這兩種資料傳輸模型:

Flink以固定的快取塊為單位進行網路資料傳輸,使用者可以通過設定快取塊超時值指定快取塊的傳輸時機。

如果快取塊的超時值為0,則Flink的資料傳輸方式類似上文所提到流處理系統的標準模型,此時系統可以獲得最低的處理延遲

如果快取塊的超時值為無限大/-1,則Flink的資料傳輸方式類似上文所提到批處理系統的標準模型,此時系統可以獲得最高的吞吐量

同時快取塊的超時值也可以設定為0到無限大之間的任意值。快取塊的超時閾值越小,則Flink流處理執行引擎的資料處理延遲越低,但吞吐量也會降低,反之亦然。通過調整快取塊的超時閾值,使用者可根據需求靈活地權衡系統延遲和吞吐量

預設情況下,流中的元素並不會一個一個的在網路中傳輸,而是快取起來伺機一起傳送(預設為32KB,通過taskmanager.memory.segment-size設定),這樣可以避免導致頻繁的網路傳輸,提高吞吐量,但如果資料來源輸入不夠快的話會導致後續的資料處理延遲,所以可以使用env.setBufferTimeout(預設100ms),來為快取填入設定一個最大等待時間。等待時間到了之後,即使快取還未填滿,快取中的資料也會自動傳送。

ltimeoutMillis > 0 表示最長等待 timeoutMillis 時間,就會flush

ltimeoutMillis = 0 表示每條資料都會觸發 flush,直接將資料傳送到下游,相當於沒有Buffer了(避免設定為0,可能導致效能下降)

ltimeoutMillis = -1 表示只有等到 buffer滿了或 CheckPoint的時候,才會flush。相當於取消了 timeout 策略

總結:

Flink以快取塊為單位進行網路資料傳輸,使用者可以設定快取塊超時時間和快取塊大小來控制緩衝塊傳輸時機,從而控制Flink的延遲性和吞吐量

 

3.  Flink安裝部署

Flink支援多種安裝模式

- Local—本地單機模式,學習測試時使用

- Standalone—獨立叢集模式,Flink自帶叢集,開發測試環境使用

- StandaloneHA—獨立叢集高可用模式,Flink自帶叢集,開發測試環境使用

- On Yarn—計算資源統一由Hadoop YARN管理,生產環境使用

 

3.1  Local本地模式

3.1.1  原理

 

 

 

1. Flink程式由JobClient進行提交

2. JobClient將作業提交給JobManager

3. JobManager負責協調資源分配和作業執行。資源分配完成後,任務將提交給相應的TaskManager

4. TaskManager啟動一個執行緒以開始執行。TaskManager會向JobManager報告狀態更改,如開始執行,正在進行或已完成。

5. 作業執行完成後,結果將傳送回客戶端(JobClient)

 

3.1.2  操作

1.下載安裝包

https://archive.apache.org/dist/flink/

 

2.上傳flink-1.12.0-bin-scala_2.12.tgznode1的指定目錄

 

3.解壓

tar -zxvf flink-1.12.0-bin-scala_2.12.tgz 

 

4.如果出現許可權問題,需要修改許可權

chown -R root:root /export/server/flink-1.12.0

5.改名或建立軟連結

mv flink-1.12.0 flink

ln -s /export/server/flink-1.12.0 /export/server/flink

 

3.1.3  測試

1.準備檔案/root/words.txt

vim /root/words.txt

hello me you her

hello me you

hello me

hello

 

 

2.啟動Flink本地“叢集”

 /export/server/flink/bin/start-cluster.sh

 

3.使用jps可以檢視到下面兩個程序

 - TaskManagerRunner

 - StandaloneSessionClusterEntrypoint

 

4.訪問Flink的Web UI

 http://node1:8081/#/overview

 

 

 

 slot在Flink裡面可以認為是資源組,Flink是通過將任務分成子任務並且將這些子任務分配到slot來並行執行程式。

 

5.執行官方示例

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar --input /root/words.txt --output /root/out

 

6.停止Flink

/export/server/flink/bin/stop-cluster.sh

 

 

啟動shell互動式視窗(目前所有Scala 2.12版本的安裝包暫時都不支援 Scala Shell)

/export/server/flink/bin/start-scala-shell.sh local

 

執行如下命令

benv.readTextFile("/root/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()

 

退出shell

:quit

 

3.2  Standalone獨立叢集模式

3.2.1  原理

 

 

 

1. client客戶端提交任務給JobManager

2. JobManager負責申請任務執行所需要的資源並管理任務和資源,

3. JobManager分發任務給TaskManager執行

4. TaskManager定期向JobManager彙報狀態

 

3.2.2  操作

1.叢集規劃:

- 伺服器: node1(Master + Slave): JobManager + TaskManager

- 伺服器: node2(Slave): TaskManager

- 伺服器: node3(Slave): TaskManager

 

2.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

jobmanager.rpc.address: node1

taskmanager.numberOfTaskSlots: 2

web.submit.enable: true

 

#歷史伺服器

jobmanager.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

historyserver.web.address: node1

historyserver.web.port: 8082

historyserver.archive.fs.dir: hdfs://node1:8020/flink/completed-jobs/

 

 

 

 

2.修改masters

vim /export/server/flink/conf/masters

node1:8081

 

3.修改slaves

vim /export/server/flink/conf/workers

node1

node2

node3

 

4.新增HADOOP_CONF_DIR環境變數

vim /etc/profile

export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop

 

 

5.分發

 scp -r /export/server/flink node2:/export/server/flink

 scp -r /export/server/flink node3:/export/server/flink

 scp  /etc/profile node2:/etc/profile

 scp  /etc/profile node3:/etc/profile

 或

 for i in {2..3}; do scp -r flink node$i:$PWD; done

 

6.source

source /etc/profile

 

3.2.3  測試

1.啟動叢集node1上執行如下命令

 /export/server/flink/bin/start-cluster.sh

 或者單獨啟動

/export/server/flink/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

/export/server/flink/bin/taskmanager.sh start|start-foreground|stop|stop-all

 

2.啟動歷史伺服器

/export/server/flink/bin/historyserver.sh start

 

3.訪問Flink UI介面或使用jps檢視

http://node1:8081/#/overview

http://node1:8082/#/overview

TaskManager介面:可以檢視到當前Flink叢集中有多少個TaskManager,每個TaskManager的slots、記憶體、CPU Core是多少

 

 

 

 

4.執行官方測試案例

/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar --input hdfs://node1:8020/wordcount/input/words.txt --output hdfs://node1:8020/wordcount/output/result.txt  --parallelism 2

 

 

 

 

5.檢視歷史日誌

http://node1:50070/explorer.html#/flink/completed-jobs

http://node1:8082/#/overview

 

6.停止Flink叢集

/export/server/flink/bin/stop-cluster.sh

 

 

3.3  Standalone-HA高可用叢集模式

3.3.1  原理

 

 

 

從之前的架構中我們可以很明顯的發現 JobManager 有明顯的單點問題(SPOF,single point of failure)JobManager 肩負著任務排程以及資源分配,一旦 JobManager 出現意外,其後果可想而知。

Zookeeper 的幫助下,一個 Standalone的Flink叢集會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連線後(如宕機或 Crash)Zookeeper 會從 Standby 中選一個新的 JobManager 來接管 Flink 叢集。

 

 

3.3.2  操作

1.叢集規劃

- 伺服器: node1(Master + Slave): JobManager + TaskManager

- 伺服器: node2(Master + Slave): JobManager + TaskManager

- 伺服器: node3(Slave): TaskManager

 

2.啟動ZooKeeper

zkServer.sh status

zkServer.sh stop

zkServer.sh start

 

3.啟動HDFS

/export/serves/hadoop/sbin/start-dfs.sh

 

4.停止Flink叢集

/export/server/flink/bin/stop-cluster.sh

 

5.修改flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

增加如下內容G

state.backend: filesystem

state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints

high-availability: zookeeper

high-availability.storageDir: hdfs://node1:8020/flink/ha/

high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

 

配置解釋

#開啟HA,使用檔案系統作為快照儲存

state.backend: filesystem

 

#啟用檢查點,可以將快照儲存到HDFS

state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints

 

#使用zookeeper搭建高可用

high-availability: zookeeper

 

# 儲存JobManager的元資料到HDFS

high-availability.storageDir: hdfs://node1:8020/flink/ha/

 

# 配置ZK叢集地址

high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

 

 

6.修改masters

vim /export/server/flink/conf/masters

node1:8081

node2:8081

 

7.同步

scp -r /export/server/flink/conf/flink-conf.yaml node2:/export/server/flink/conf/

scp -r /export/server/flink/conf/flink-conf.yaml node3:/export/server/flink/conf/

scp -r /export/server/flink/conf/masters node2:/export/server/flink/conf/

scp -r /export/server/flink/conf/masters node3:/export/server/flink/conf/

 

8.修改node2上的flink-conf.yaml

vim /export/server/flink/conf/flink-conf.yaml

jobmanager.rpc.address: node2

 

9.重新啟動Flink叢集,node1上執行

/export/server/flink/bin/stop-cluster.sh

/export/server/flink/bin/start-cluster.sh

 

 

 

 

10.使用jps命令檢視

發現沒有Flink相關程序被啟動

 

11.檢視日誌

cat /export/server/flink/log/flink-root-standalonesession-0-node1.log

發現如下錯誤

 

 

 

因為在Flink1.8版本後,Flink官方提供的安裝包裡沒有整合HDFS的jar

 

12.下載jar包並在Flink的lib目錄下放入該jar包並分發使Flink能夠支援對Hadoop的操作

下載地址

https://flink.apache.org/downloads.html

 

 

 

放入lib目錄

cd /export/server/flink/lib

 

 

 

 

分發

for i in {2..3}; do scp -r flink-shaded-hadoop-2-uber-2.7.5-10.0.jar node$i:$PWD; done

 

13.重新啟動Flink叢集,node1上執行

/export/server/flink/bin/start-cluster.sh

 

14.使用jps命令檢視,發現三臺機器已經ok

 

 

 

3.3.3  測試

1.訪問WebUI

http://node1:8081/#/job-manager/config

http://node2:8081/#/job-manager/config

 

2.執行wc

/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

 

3.kill掉其中一個master

 

4.重新執行wc,還是可以正常執行

/export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

 

3.停止叢集

/export/server/flink/bin/stop-cluster.sh

 

 

3.4  Flink On Yarn模式

3.4.1  原理

3.4.1.1  為什麼使用Flink On Yarn?

在實際開發中,使用Flink時,更多的使用方式是Flink On Yarn模式,原因如下:

-1.Yarn資源可以按需使用,提高叢集的資源利用率

-2.Yarn任務有優先順序,根據優先順序執行作業

-3.基於Yarn排程系統,能夠自動化地處理各個角色的 Failover(容錯)

○ JobManager 程序和 TaskManager 程序都由 Yarn NodeManager 監控

如果 JobManager 程序異常退出,則 Yarn ResourceManager 會重新排程 JobManager 到其他機器

如果 TaskManager 程序異常退出,JobManager 會收到訊息並重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager

 

 

 

3.4.1.2  Flink如何和Yarn進行互動?

 

 

 

 

 

1.Client上傳jar包和配置檔案到HDFS叢集上

2.Client向Yarn ResourceManager提交任務並申請資源

3.ResourceManager分配Container資源並啟動ApplicationMaster,然後AppMaster載入Flink的Jar包和配置構建環境,啟動JobManager

JobManager和ApplicationMaster執行在同一個container上。

一旦他們被成功啟動,AppMaster就知道JobManager的地址(AM它自己所在的機器)。

它就會為TaskManager生成一個新的Flink配置檔案(他們就可以連線到JobManager)。

這個配置檔案也被上傳到HDFS上。

此外,AppMaster容器也提供了Flink的web服務介面。

YARN所分配的所有埠都是臨時埠,這允許使用者並行執行多個Flink

4.ApplicationMaster向ResourceManager申請工作資源,NodeManager載入Flink的Jar包和配置構建環境並啟動TaskManager

5.TaskManager啟動後向JobManager傳送心跳包,並等待JobManager向其分配任務

 

3.4.1.3  兩種方式
3.4.1.3.1  Session模式

 

 

 

 

 

特點:需要事先申請資源,啟動JobManager和TaskManger

優點:不需要每次遞交作業申請資源,而是使用已經申請好的資源,從而提高執行效率

缺點:作業執行完成以後,資源不會被釋放,因此一直會佔用系統資源

應用場景:適合作業遞交比較頻繁的場景,小作業比較多的場景

 

 

3.4.1.3.2  Per-Job模式

 

 

 

特點:每次遞交作業都需要申請一次資源

優點:作業執行完成,資源會立刻被釋放,不會一直佔用系統資源

缺點:每次遞交作業都需要申請資源,會影響執行效率,因為申請資源需要消耗時間

應用場景:適合作業比較少的場景、大作業的場景

 

 

3.4.2  操作

1.關閉yarn的記憶體檢查

vim /export/server/hadoop/etc/hadoop/yarn-site.xml

新增:

<!-- 關閉yarn記憶體檢查 -->

<property>

<name>yarn.nodemanager.pmem-check-enabled</name>

    <value>false</value>

</property>

<property>

     <name>yarn.nodemanager.vmem-check-enabled</name>

     <value>false</value>

</property>

說明:

是否啟動一個執行緒檢查每個任務正使用的虛擬記憶體量,如果任務超出分配值,則直接將其殺掉,預設是true。

在這裡面我們需要關閉,因為對於flink使用yarn模式下,很容易記憶體超標,這個時候yarn會自動殺掉job

 

2.同步

scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node2:/export/server/hadoop/etc/hadoop/yarn-site.xml

scp -r /export/server/hadoop/etc/hadoop/yarn-site.xml node3:/export/server/hadoop/etc/hadoop/yarn-site.xml

 

3.重啟yarn

/export/server/hadoop/sbin/stop-yarn.sh

/export/server/hadoop/sbin/start-yarn.sh

 

3.4.3  測試

3.4.3.1  Session模式

yarn-session.sh(開闢資源) + flink run(提交任務)

 

1.yarn上啟動一個Flink會話,node1上執行以下命令

/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

 

說明:

申請2個CPU、1600M記憶體

# -n 表示申請2個容器,這裡指的就是多少個taskmanager

# -tm 表示每個TaskManager的記憶體大小

# -s 表示每個TaskManager的slots數量

# -d 表示以後臺程式方式執行

 

注意:

該警告不用管

WARN  org.apache.hadoop.hdfs.DFSClient  - Caught exception

java.lang.InterruptedException

 

2.檢視UI介面

http://node1:8088/cluster

 

 

 

 

 

 

3.使用flink run提交任務:

 /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

 執行完之後可以繼續執行其他的小任務

 /export/server/flink/bin/flink run  /export/server/flink/examples/batch/WordCount.jar

 

 

4.通過上方的ApplicationMaster可以進入Flink的管理介面

 

 

 

 

 

5.關閉yarn-session:

yarn application -kill application_1599402747874_0001

 

 

 

rm -rf /tmp/.yarn-properties-root

 

3.4.3.2  Per-Job分離模式

1.直接提交job

/export/server/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/server/flink/examples/batch/WordCount.jar

# -m  jobmanager的地址

# -yjm 1024 指定jobmanager的記憶體資訊

# -ytm 1024 指定taskmanager的記憶體資訊

 

 

2.檢視UI介面

http://node1:8088/cluster

 

 

 

 

 

 

 

3.注意:

在之前版本中如果使用的是flink on yarn方式,想切換回standalone模式的話,如果報錯需要刪除:【/tmp/.yarn-properties-root】

rm -rf /tmp/.yarn-properties-root

因為預設查詢當前yarn叢集中已有的yarn-session資訊中的jobmanager

 

3.4.4  引數總結

[root@node1 bin]# /export/server/flink/bin/flink --help

./flink <ACTION> [OPTIONS] [ARGUMENTS]

 

The following actions are available:

 

Action "run" compiles and runs a program.

 

  Syntax: run [OPTIONS] <jar-file> <arguments>

  "run" action options:

     -c,--class <classname>               Class with the program entry point

                                          ("main()" method). Only needed if the

                                          JAR file does not specify the class in

                                          its manifest.

     -C,--classpath <url>                 Adds a URL to each user code

                                          classloader  on all nodes in the

                                          cluster. The paths must specify a

                                          protocol (e.g. file://) and be

                                          accessible on all nodes (e.g. by means

                                          of a NFS share). You can use this

                                          option multiple times for specifying

                                          more than one URL. The protocol must

                                          be supported by the {@link

                                          java.net.URLClassLoader}.

     -d,--detached                        If present, runs the job in detached

                                          mode

     -n,--allowNonRestoredState           Allow to skip savepoint state that

                                          cannot be restored. You need to allow

                                          this if you removed an operator from

                                          your program that was part of the

                                          program when the savepoint was

                                          triggered.

     -p,--parallelism <parallelism>       The parallelism with which to run the

                                          program. Optional flag to override the

                                          default value specified in the

                                          configuration.

     -py,--python <pythonFile>            Python script with the program entry

                                          point. The dependent resources can be

                                          configured with the `--pyFiles`

                                          option.

     -pyarch,--pyArchives <arg>           Add python archive files for job. The

                                          archive files will be extracted to the

                                          working directory of python UDF

                                          worker. Currently only zip-format is

                                          supported. For each archive file, a

                                          target directory be specified. If the

                                          target directory name is specified,

                                          the archive file will be extracted to

                                          a name can directory with the

                                          specified name. Otherwise, the archive

                                          file will be extracted to a directory

                                          with the same name of the archive

                                          file. The files uploaded via this

                                          option are accessible via relative

                                          path. '#' could be used as the

                                          separator of the archive file path and

                                          the target directory name. Comma (',')

                                          could be used as the separator to

                                          specify multiple archive files. This

                                          option can be used to upload the

                                          virtual environment, the data files

                                          used in Python UDF (e.g.: --pyArchives

                                          file:///tmp/py37.zip,file:///tmp/data.

                                          zip#data --pyExecutable

                                          py37.zip/py37/bin/python). The data

                                          files could be accessed in Python UDF,

                                          e.g.: f = open('data/data.txt', 'r').

     -pyexec,--pyExecutable <arg>         Specify the path of the python

                                          interpreter used to execute the python

                                          UDF worker (e.g.: --pyExecutable

                                          /usr/local/bin/python3). The python

                                          UDF worker depends on Python 3.5+,

                                          Apache Beam (version == 2.23.0), Pip

                                          (version >= 7.1.0) and SetupTools

                                          (version >= 37.0.0). Please ensure

                                          that the specified environment meets

                                          the above requirements.

     -pyfs,--pyFiles <pythonFiles>        Attach custom python files for job.

                                          These files will be added to the

                                          PYTHONPATH of both the local client

                                          and the remote python UDF worker. The

                                          standard python resource file suffixes

                                          such as .py/.egg/.zip or directory are

                                          all supported. Comma (',') could be

                                          used as the separator to specify

                                          multiple files (e.g.: --pyFiles

                                          file:///tmp/myresource.zip,hdfs:///$na

                                          menode_address/myresource2.zip).

     -pym,--pyModule <pythonModule>       Python module with the program entry

                                          point. This option must be used in

                                          conjunction with `--pyFiles`.

     -pyreq,--pyRequirements <arg>        Specify a requirements.txt file which

                                          defines the third-party dependencies.

                                          These dependencies will be installed

                                          and added to the PYTHONPATH of the

                                          python UDF worker. A directory which

                                          contains the installation packages of

                                          these dependencies could be specified

                                          optionally. Use '#' as the separator

                                          if the optional parameter exists

                                          (e.g.: --pyRequirements

                                          file:///tmp/requirements.txt#file:///t

                                          mp/cached_dir).

     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job

                                          from (for example

                                          hdfs:///flink/savepoint-1537).

     -sae,--shutdownOnAttachedExit        If the job is submitted in attached

                                          mode, perform a best-effort cluster

                                          shutdown when the CLI is terminated

                                          abruptly, e.g., in response to a user

                                          interrupt, such as typing Ctrl + C.

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

  Options for yarn-cluster mode:

     -d,--detached                        If present, runs the job in detached

                                          mode

     -m,--jobmanager <arg>                Set to yarn-cluster to use YARN

                                          execution mode.

     -yat,--yarnapplicationType <arg>     Set a custom application type for the

                                          application on YARN

     -yD <property=value>                 use value for given property

     -yd,--yarndetached                   If present, runs the job in detached

                                          mode (deprecated; use non-YARN

                                          specific option instead)

     -yh,--yarnhelp                       Help for the Yarn session CLI.

     -yid,--yarnapplicationId <arg>       Attach to running YARN session

     -yj,--yarnjar <arg>                  Path to Flink jar file

     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with

                                          optional unit (default: MB)

     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN

                                          application

     -ynm,--yarnname <arg>                Set a custom name for the application

                                          on YARN

     -yq,--yarnquery                      Display available YARN resources

                                          (memory, cores)

     -yqu,--yarnqueue <arg>               Specify YARN queue.

     -ys,--yarnslots <arg>                Number of slots per TaskManager

     -yt,--yarnship <arg>                 Ship files in the specified directory

                                          (t for transfer)

     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with

                                          optional unit (default: MB)

     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper

                                          sub-paths for high availability mode

     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper

                                          sub-paths for high availability mode

 

  Options for default mode:

     -D <property=value>             Allows specifying multiple generic

                                     configuration options. The available

                                     options can be found at

                                     https://ci.apache.org/projects/flink/flink-

                                     docs-stable/ops/config.html

     -m,--jobmanager <arg>           Address of the JobManager to which to

                                     connect. Use this flag to connect to a

                                     different JobManager than the one specified

                                     in the configuration. Attention: This

                                     option is respected only if the

                                     high-availability configuration is NONE.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths

                                     for high availability mode

 

 

 

Action "run-application" runs an application in Application Mode.

 

  Syntax: run-application [OPTIONS] <jar-file> <arguments>

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

 

 

Action "info" shows the optimized execution plan of the program (JSON).

 

  Syntax: info [OPTIONS] <jar-file> <arguments>

  "info" action options:

     -c,--class <classname>           Class with the program entry point

                                      ("main()" method). Only needed if the JAR

                                      file does not specify the class in its

                                      manifest.

     -p,--parallelism <parallelism>   The parallelism with which to run the

                                      program. Optional flag to override the

                                      default value specified in the

                                      configuration.

 

 

Action "list" lists running and scheduled programs.

 

  Syntax: list [OPTIONS]

  "list" action options:

     -a,--all         Show all programs and their JobIDs

     -r,--running     Show only running programs and their JobIDs

     -s,--scheduled   Show only scheduled programs and their JobIDs

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

  Options for yarn-cluster mode:

     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution

                                      mode.

     -yid,--yarnapplicationId <arg>   Attach to running YARN session

     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper

                                      sub-paths for high availability mode

 

  Options for default mode:

     -D <property=value>             Allows specifying multiple generic

                                     configuration options. The available

                                     options can be found at

                                     https://ci.apache.org/projects/flink/flink-

                                     docs-stable/ops/config.html

     -m,--jobmanager <arg>           Address of the JobManager to which to

                                     connect. Use this flag to connect to a

                                     different JobManager than the one specified

                                     in the configuration. Attention: This

                                     option is respected only if the

                                     high-availability configuration is NONE.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths

                                     for high availability mode

 

 

 

Action "stop" stops a running program with a savepoint (streaming jobs only).

 

  Syntax: stop [OPTIONS] <Job ID>

  "stop" action options:

     -d,--drain                           Send MAX_WATERMARK before taking the

                                          savepoint and stopping the pipelne.

     -p,--savepointPath <savepointPath>   Path to the savepoint (for example

                                          hdfs:///flink/savepoint-1537). If no

                                          directory is specified, the configured

                                          default will be used

                                          ("state.savepoints.dir").

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

  Options for yarn-cluster mode:

     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution

                                      mode.

     -yid,--yarnapplicationId <arg>   Attach to running YARN session

     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper

                                      sub-paths for high availability mode

 

  Options for default mode:

     -D <property=value>             Allows specifying multiple generic

                                     configuration options. The available

                                     options can be found at

                                     https://ci.apache.org/projects/flink/flink-

                                     docs-stable/ops/config.html

     -m,--jobmanager <arg>           Address of the JobManager to which to

                                     connect. Use this flag to connect to a

                                     different JobManager than the one specified

                                     in the configuration. Attention: This

                                     option is respected only if the

                                     high-availability configuration is NONE.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths

                                     for high availability mode

 

 

 

Action "cancel" cancels a running program.

 

  Syntax: cancel [OPTIONS] <Job ID>

  "cancel" action options:

     -s,--withSavepoint <targetDirectory>   **DEPRECATION WARNING**: Cancelling

                                            a job with savepoint is deprecated.

                                            Use "stop" instead.

                                            Trigger savepoint and cancel job.

                                            The target directory is optional. If

                                            no directory is specified, the

                                            configured default directory

                                            (state.savepoints.dir) is used.

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

  Options for yarn-cluster mode:

     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution

                                      mode.

     -yid,--yarnapplicationId <arg>   Attach to running YARN session

     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper

                                      sub-paths for high availability mode

 

  Options for default mode:

     -D <property=value>             Allows specifying multiple generic

                                     configuration options. The available

                                     options can be found at

                                     https://ci.apache.org/projects/flink/flink-

                                     docs-stable/ops/config.html

     -m,--jobmanager <arg>           Address of the JobManager to which to

                                     connect. Use this flag to connect to a

                                     different JobManager than the one specified

                                     in the configuration. Attention: This

                                     option is respected only if the

                                     high-availability configuration is NONE.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths

                                     for high availability mode

 

 

 

Action "savepoint" triggers savepoints for a running job or disposes existing ones.

 

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]

  "savepoint" action options:

     -d,--dispose <arg>       Path of savepoint to dispose.

     -j,--jarfile <jarfile>   Flink program JAR file.

  Options for Generic CLI mode:

     -D <property=value>   Allows specifying multiple generic configuration

                           options. The available options can be found at

                           https://ci.apache.org/projects/flink/flink-docs-stabl

                           e/ops/config.html

     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is

                           also available with the "Application Mode".

                           The name of the executor to be used for executing the

                           given job, which is equivalent to the

                           "execution.target" config option. The currently

                           available executors are: "remote", "local",

                           "kubernetes-session", "yarn-per-job", "yarn-session".

     -t,--target <arg>     The deployment target for the given application,

                           which is equivalent to the "execution.target" config

                           option. For the "run" action the currently available

                           targets are: "remote", "local", "kubernetes-session",

                           "yarn-per-job", "yarn-session". For the

                           "run-application" action the currently available

                           targets are: "kubernetes-application",

                           "yarn-application".

 

  Options for yarn-cluster mode:

     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution

                                      mode.

     -yid,--yarnapplicationId <arg>   Attach to running YARN session

     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper

                                      sub-paths for high availability mode

 

  Options for default mode:

     -D <property=value>             Allows specifying multiple generic

                                     configuration options. The available

                                     options can be found at

                                     https://ci.apache.org/projects/flink/flink-

                                     docs-stable/ops/config.html

     -m,--jobmanager <arg>           Address of the JobManager to which to

                                     connect. Use this flag to connect to a

                                     different JobManager than the one specified

                                     in the configuration. Attention: This

                                     option is respected only if the

                                     high-availability configuration is NONE.

     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths

                                     for high availability mode

 

 

4.  Flink入門案例

4.1  前置說明

4.1.1  API

l API

Flink提供了多個層次的API供開發者使用,越往上抽象程度越高,使用起來越方便;越往下越底層,使用起來難度越大

 

 

 

 

 

 

 

注意:在Flink1.12時支援流批一體,DataSetAPI已經不推薦使用了,所以課程中除了個別案例使用DataSet外,後續其他案例都會優先使用DataStream流式API,既支援無界資料處理/流處理,也支援有界資料處理/批處理!當然Table&SQL-API會單獨學習

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/

https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

 

 

 

 

 

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

 

 

 

 

4.1.2  程式設計模型

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

 

l 程式設計模型

Flink 應用程式結構主要包含三部分,Source/Transformation/Sink,如下圖所示:

 

 

 

 

 

 

4.2  準備工程

4.2.1  pom檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>flink_study_42</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 指定倉庫位置,依次為aliyunapachecloudera倉庫 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink執行計劃,這是1.9版本之前的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- blink執行計劃,1.11+預設的-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>-->

        <!-- flink聯結器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-filesystem_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>-->
        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>-->
        <!--<dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-parquet_2.12</artifactId>
              <version>${flink.version}</version>
         </dependency>-->
        <!--<dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.10.0</version>
        </dependency>-->


        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-runtime_2.11</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-core</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>flink-java</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-metastore</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
            <!--<version>8.0.20</version>-->
        </dependency>

        <!-- 高效能非同步元件:Vertx-->
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-jdbc-client</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-redis-client</artifactId>
            <version>3.9.0</version>
        </dependency>

        <!-- 日誌 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <!-- 參考:https://blog.csdn.net/f641385712/article/details/84109098-->
        <!--<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>-->
        <!--<dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
            <type>pom</type>
            <scope>provided</scope>
         </dependency>-->
        <!--<dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
           <version>28.2-jre</version>
       </dependency>-->

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 編譯外掛 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包外掛(會包含所有依賴) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 設定jar包的入口類(可選) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

 

4.2.2  log4j.properties

log4j.properties

log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

 

4.3  Flink初體驗

4.3.1  需求

使用Flink實現WordCount

 

4.3.2  編碼步驟

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

 

1.準備環境-env

2.準備資料-source

3.處理資料-transformation

4.輸出結果-sink

5.觸發執行-execute

 

其中建立環境可以使用如下3種方式:

getExecutionEnvironment() //推薦使用

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

 

 

4.3.3  程式碼實現

 

 

 

 

4.3.3.1  基於DataSet

package cn.itcast.hello;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 * 需求:使用Flink完成WordCount-DataSet
 * 編碼步驟
 * 1.準備環境-env
 * 2.準備資料-source
 * 3.處理資料-transformation
 * 4.輸出結果-sink
 * 5.觸發執行-execute//如果有print,DataSet不需要呼叫execute,DataStream需要呼叫execute
 */
public class WordCount1 {
    public static void main(String[] args) throws Exception {
        //老版本的批處理API如下,但已經不推薦使用了
        //1.準備環境-env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.準備資料-source
        DataSet<String> lineDS = env.fromElements("itcast hadoop spark","itcast hadoop spark","itcast hadoop","itcast");
        //3.處理資料-transformation
        //3.1每一行資料按照空格切分成一個個的單片語成一個集合
        /*
        public interface FlatMapFunction<T, O> extends Function, Serializable {
            void flatMap(T value, Collector<O> out) throws Exception;
        }
         */
        DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的資料
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//將切割處理的一個個的單詞收集起來並返回
                }
            }
        });
        //3.2對集合中的每個單詞記為1
        /*
        public interface MapFunction<T, O> extends Function, Serializable {
            O map(T value) throws Exception;
        }
         */
        DataSet<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是進來一個個的單詞
                return Tuple2.of(value, 1);
            }
        });

        //3.3對資料按照單詞(key)進行分組
        //0表示按照tuple中的索引為0的欄位,也就是key(單詞)進行分組
        UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOnesDS.groupBy(0);

        //3.4對各個組內的資料按照數量(value)進行聚合就是求sum
        //1表示按照tuple中的索引為1的欄位也就是按照數量進行聚合累加!
        DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);

        //3.5排序
        DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);

        //4.輸出結果-sink
        result.print();

        //5.觸發執行-execute//如果有print,DataSet不需要呼叫execute,DataStream需要呼叫execute
        //env.execute();//'execute()', 'count()', 'collect()', or 'print()'.
    }
}

 

4.3.3.2  基於DataStream

package cn.itcast.hello;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 * 需求:使用Flink完成WordCount-DataStream
 * 編碼步驟
 * 1.準備環境-env
 * 2.準備資料-source
 * 3.處理資料-transformation
 * 4.輸出結果-sink
 * 5.觸發執行-execute
 */
public class WordCount2 {
    public static void main(String[] args) throws Exception {
        //新版本的流批統一API,既支援流處理也支援批處理
        //1.準備環境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        //2.準備資料-source
        DataStream<String> linesDS = env.fromElements("itcast hadoop spark","itcast hadoop spark","itcast hadoop","itcast");

        //3.處理資料-transformation
        //3.1每一行資料按照空格切分成一個個的單片語成一個集合
        /*
        public interface FlatMapFunction<T, O> extends Function, Serializable {
            void flatMap(T value, Collector<O> out) throws Exception;
        }
         */
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的資料
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//將切割處理的一個個的單詞收集起來並返回
                }
            }
        });
        //3.2對集合中的每個單詞記為1
        /*
        public interface MapFunction<T, O> extends Function, Serializable {
            O map(T value) throws Exception;
        }
         */
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是進來一個個的單詞
                return Tuple2.of(value, 1);
            }
        });

        //3.3對資料按照單詞(key)進行分組
        //0表示按照tuple中的索引為0的欄位,也就是key(單詞)進行分組
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);



        //3.4對各個組內的資料按照數量(value)進行聚合就是求sum
        //1表示按照tuple中的索引為1的欄位也就是按照數量進行聚合累加!
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.輸出結果-sink
        result.print();

        //5.觸發執行-execute
        env.execute();//DataStream需要呼叫execute
    }
}

 

4.3.3.3  Lambda版

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions

package cn.itcast.hello;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Author itcast
 * Desc
 * 需求:使用Flink完成WordCount-DataStream--使用lambda表示式
 * 編碼步驟
 * 1.準備環境-env
 * 2.準備資料-source
 * 3.處理資料-transformation
 * 4.輸出結果-sink
 * 5.觸發執行-execute
 */
public class WordCount3_Lambda {
    public static void main(String[] args) throws Exception {
        //1.準備環境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        //2.準備資料-source
        DataStream<String> linesDS = env.fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
        //3.處理資料-transformation
        //3.1每一行資料按照空格切分成一個個的單片語成一個集合
        /*
        public interface FlatMapFunction<T, O> extends Function, Serializable {
            void flatMap(T value, Collector<O> out) throws Exception;
        }
         */
        //lambda表示式的語法:
        // (引數)->{方法體/函式體}
        //lambda表示式就是一個函式,函式的本質就是物件
        DataStream<String> wordsDS = linesDS.flatMap(
                (String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
        ).returns(Types.STRING);

        //3.2對集合中的每個單詞記為1
        /*
        public interface MapFunction<T, O> extends Function, Serializable {
            O map(T value) throws Exception;
        }
         */
        /*DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(
                (String value) -> Tuple2.of(value, 1)
        ).returns(Types.TUPLE(Types.STRING, Types.INT));*/
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(
                (String value) -> Tuple2.of(value, 1)
                , TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
        );

        //3.3對資料按照單詞(key)進行分組
        //0表示按照tuple中的索引為0的欄位,也就是key(單詞)進行分組
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        //KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);

        //3.4對各個組內的資料按照數量(value)進行聚合就是求sum
        //1表示按照tuple中的索引為1的欄位也就是按照數量進行聚合累加!
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.輸出結果-sink
        result.print();

        //5.觸發執行-execute
        env.execute();
    }
}

 

4.3.3.4  Yarn上執行

注意

寫入HDFS如果存在許可權問題:

進行如下設定:

hadoop fs -chmod -R 777  /

並在程式碼中新增:

System.setProperty("HADOOP_USER_NAME", "root")

 

1. 修改程式碼

package cn.itcast.hello;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Author itcast
 * Desc
 * 需求:使用Flink完成WordCount-DataStream--使用lambda表示式--修改程式碼使適合在Yarn上執行
 * 編碼步驟
 * 1.準備環境-env
 * 2.準備資料-source
 * 3.處理資料-transformation
 * 4.輸出結果-sink
 * 5.觸發執行-execute//批處理不需要呼叫!流處理需要
 */
public class WordCount4_Yarn {
    public static void main(String[] args) throws Exception {
        //獲取引數
        ParameterTool params = ParameterTool.fromArgs(args);
        String output = null;
        if (params.has("output")) {
            output = params.get("output");
        } else {
            output = "hdfs://node1:8020/wordcount/output_" + System.currentTimeMillis();
        }

        //1.準備環境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        //2.準備資料-source
        DataStream<String> linesDS = env.fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
        //3.處理資料-transformation
        DataStream<Tuple2<String, Integer>> result = linesDS
                .flatMap(
                        (String value, Collector<String> out) -> Arrays.stream(value.split(" ")).forEach(out::collect)
                ).returns(Types.STRING)
                .map(
                        (String value) -> Tuple2.of(value, 1)
                ).returns(Types.TUPLE(Types.STRING, Types.INT))
                //.keyBy(0);
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) t -> t.f0)
                .sum(1);

        //4.輸出結果-sink
        result.print();

        //如果執行報hdfs許可權相關錯誤,可以執行 hadoop fs -chmod -R 777  /
        System.setProperty("HADOOP_USER_NAME", "root");//設定使用者名稱
        //result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1);
        result.writeAsText(output).setParallelism(1);

        //5.觸發執行-execute
        env.execute();
    }
}

 

2. 打包

 

3.改名

 

3. 上傳

 

 

 

5.提交執行

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

 

 

 

 

/export/server/flink/bin/flink run -Dexecution.runtime-mode=BATCH -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.itcast.hello.WordCount4_Yarn /root/wc.jar --output hdfs://node1:8020/wordcount/output_xx

 

 

5.在Web頁面可以觀察到提交的程式:

http://node1:8088/cluster

http://node1:50070/explorer.html#/

或者在Standalone模式下使用web介面提交

 

 

 

 

5.  Flink原理初探

5.1  Flink角色分工

在實際生產中,Flink 都是以叢集在執行,在執行的過程中包含了兩類程序。

l JobManager:

它扮演的是叢集管理者的角色,負責排程任務、協調 checkpoints、協調故障恢復、收集 Job 的狀態資訊,並管理 Flink 叢集中的從節點 TaskManager。

l TaskManager:

實際負責執行計算的 Worker,在其上執行 Flink Job 的一組 Task;TaskManager 還是所在節點的管理員,它負責把該節點上的伺服器資訊比如記憶體、磁碟、任務執行情況等向 JobManager 彙報。

l Client:

使用者在提交編寫好的 Flink 工程時,會先建立一個客戶端再進行提交,這個客戶端就是 Client

 

 

 

 

 

 

 

5.2  Flink執行流程

https://blog.csdn.net/sxiaobei/article/details/80861070

https://blog.csdn.net/super_wj0820/article/details/90726768

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html

 

5.2.1  Standalone版

 

 

 

5.2.2  On Yarn版

 

 

 

 

1. Client向HDFS上傳Flink的Jar包和配置

2. Client向Yarn ResourceManager提交任務並申請資源

3. ResourceManager分配Container資源並啟動ApplicationMaster,然後AppMaster載入Flink的Jar包和配置構建環境,啟動JobManager

4. ApplicationMaster向ResourceManager申請工作資源,NodeManager載入Flink的Jar包和配置構建環境並啟動TaskManager

5. TaskManager啟動後向JobManager傳送心跳包,並等待JobManager向其分配任務

 

 

 

5.3  Flink Streaming Dataflow

官網關於Flink的詞彙表

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/glossary.html#glossary

5.3.1  Dataflow、OperatorPartitionSubTask、Parallelism

1.Dataflow:Flink程式在執行的時候會被對映成一個數據流模型

2.Operator:資料流模型中的每一個操作被稱作Operator,Operator分為:Source/Transform/Sink

3.Partition:資料流模型是分散式的和並行的,執行中會形成1~n個分割槽

4.Subtask:多個分割槽任務可以並行,每一個都是獨立執行在一個執行緒中的,也就是一個Subtask子任務

5.Parallelism:並行度,就是可以同時真正執行的子任務數/分割槽數

 

 

 

 

5.3.2  Operator傳遞模式

資料在兩個operator(運算元)之間傳遞的時候有兩種模式:

1.One to One模式:

兩個operator用此模式傳遞的時候,會保持資料的分割槽數和資料的排序;如上圖中的Source1到Map1,它就保留的Source的分割槽特性,以及分割槽元素處理的有序性。--類似於Spark中的窄依賴

2.Redistributing 模式:

這種模式會改變資料的分割槽數;每個一個operator subtask會根據選擇transformation把資料傳送到不同的目標subtasks,比如keyBy()會通過hashcode重新分割槽,broadcast()和rebalance()方法會隨機重新分割槽。--類似於Spark中的寬依賴

 

5.3.3  Operator Chain

 

 

 

客戶端在提交任務的時候會對Operator進行優化操作,能進行合併的Operator會被合併為一個Operator,

合併後的Operator稱為Operator chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的執行緒中執行--就是SubTask。

 

 

5.3.4  TaskSlot And Slot Sharing

l 任務槽(TaskSlot)

 

 

 

每個TaskManager是一個JVM的程序, 為了控制一個TaskManager(worker)能接收多少個task,Flink通過Task Slot來進行控制。TaskSlot數量是用來限制一個TaskManager工作程序中可以同時執行多少個工作執行緒,TaskSlot 是一個 TaskManager 中的最小資源分配單位,一個 TaskManager 中有多少個 TaskSlot 就意味著能支援多少併發的Task處理。

 

Flink將程序的記憶體進行了劃分到多個slot中,記憶體被劃分到不同的slot之後可以獲得如下好處:

- TaskManager最多能同時併發執行的子任務數是可以通過TaskSolt數量來控制的

- TaskSolt有獨佔的記憶體空間,這樣在一個TaskManager中可以執行多個不同的作業,作業之間不受影響。

 

l 槽共享(Slot Sharing)

 

 

 

Flink允許子任務共享插槽,即使它們是不同任務(階段)的子任務(subTask),只要它們來自同一個作業。

比如圖左下角中的map和keyBy和sink 在一個 TaskSlot 裡執行以達到資源共享的目的。

 

允許插槽共享有兩個主要好處:

- 資源分配更加公平,如果有比較空閒的slot可以將更多的任務分配給它。

- 有了任務槽共享,可以提高資源的利用率。

 

注意:

slot是靜態的概念,是指taskmanager具有的併發執行能力

parallelism是動態的概念,是指程式執行時實際使用的併發能力

 

 

 

5.4  Flink執行時元件

 

 

 

Flink執行時架構主要包括四個不同的元件,它們會在執行流處理應用程式時協同工作:

ü 作業管理器(JobManager)分配任務、排程checkpoint做快照

ü 工作管理員(TaskManager):主要幹活的

ü 資源管理器(ResourceManager):管理分配資源

ü 分發器(Dispatcher)方便遞交任務的介面,WebUI

因為Flink是用Java和Scala實現的,所以所有元件都會執行在Java虛擬機器上。每個元件的職責如下:

1. 作業管理器(JobManager)

控制一個應用程式執行的主程序,也就是說,每個應用程式都會被一個不同的JobManager 所控制執行。

n JobManager 會先接收到要執行的應用程式,這個應用程式會包括:作業圖(JobGraph)、邏輯資料流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。

n JobManager 會把JobGraph轉換成一個物理層面的資料流圖,這個圖被叫做“執行圖”(ExecutionGraph),包含了所有可以併發執行的任務。

n JobManager 會向資源管理器(ResourceManager)請求執行任務必要的資源,也就是工作管理員(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源,就會將執行圖分發到真正執行它們的TaskManager上。而在執行過程中,JobManager會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。

2. 工作管理員(TaskManager)

n Flink中的工作程序。通常在Flink中會有多個TaskManager執行,每一個TaskManager都包含了一定數量的插槽(slots)。插槽的數量限制了TaskManager能夠執行的任務數量。

啟動之後,TaskManager會向資源管理器註冊它的插槽;收到資源管理器的指令後,TaskManager就會將一個或者多個插槽提供給JobManager呼叫。JobManager就可以向插槽分配任務(tasks)來執行了。

在執行過程中,一個TaskManager可以跟其它運行同一應用程式的TaskManager交換資料。

3. 資源管理器(ResourceManager)

主要負責管理工作管理員(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定義的處理資源單元。

n Flink為不同的環境和資源管理工具提供了不同資源管理器,比如YARN、Mesos、K8s,以及standalone部署。

JobManager申請插槽資源時,ResourceManager會將有空閒插槽的TaskManager分配給JobManager。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平臺發起會話,以提供啟動TaskManager程序的容器。

4. 分發器(Dispatcher)

可以跨作業執行,它為應用提交提供了REST介面。

當一個應用被提交執行時,分發器就會啟動並將應用移交給一個JobManager。

n Dispatcher也會啟動一個Web UI,用來方便地展示和監控作業執行的資訊。

n Dispatcher在架構中可能並不是必需的,這取決於應用提交執行的方式。

 

5.5  Flink執行圖(ExecutionGraph)

Flink程式直接對映成的資料流圖是StreamGraph,也被稱為邏輯流圖,因為它們表示的是計算邏輯的高階檢視。為了執行一個流處理程式,Flink需要將邏輯流圖轉換為物理資料流圖(也叫執行圖),詳細說明程式的執行方式。

Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖

 

 

 

l 原理介紹

n Flink執行executor會自動根據程式程式碼生成DAG資料流圖

n Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

StreamGraph:是根據使用者通過 Stream API 編寫的程式碼生成的最初的圖。表示程式的拓撲結構。

JobGraphStreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的資料結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。

ExecutionGraphJobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是排程層最核心的資料結構。

物理執行圖JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。

 

l 簡單理解:

StreamGraph:最初的程式執行邏輯流程,也就是運算元之間的前後順序--在Client上生成

JobGraph:將OneToOne的Operator合併為OperatorChain--在Client上生成

ExecutionGraph:將JobGraph根據程式碼中設定的並行度和請求的資源進行並行化規劃!--在JobManager上生成

物理執行圖:將ExecutionGraph的並行計劃,落實到具體的TaskManager上,將具體的SubTask落實到具體的TaskSlot內進行執行。

 

 

 

 

Flink-一體API

學習目標

瞭解流處理的相關概念

掌握FlinkDataStream-SourceOperator

掌握FlinkDataStream-TransformationOperator

掌握FlinkDataStream-SinkOperator

瞭解Flink的累加器

掌握Flink的廣播變數

掌握Flink的分散式快取

1.  流處理相關概念

1.1  資料的時效性

日常工作中,我們一般會先把資料儲存在表,然後對錶的資料進行加工、分析。既然先儲存在表中,那就會涉及到時效性概念。

如果我們處理以年,月為單位的級別的資料處理,進行統計分析,個性化推薦,那麼資料的的最新日期離當前有幾個甚至上月都沒有問題。但是如果我們處理的是以天為級別,或者小時甚至更小粒度的資料處理,那麼就要求資料的時效性更高了。比如:對網站的實時監控、對異常日誌的監控,這些場景需要工作人員立即響應,這樣的場景下,傳統的統一收集資料,再存到資料庫中,再取出來進行分析就無法滿足高時效性的需求了。

 

1.2  流處理和批處理

https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/

 

 

 

 

- Batch Analytics,右邊是 Streaming Analytics。批量計算: 統一收集資料->儲存到DB->對資料進行批量處理,就是傳統意義上使用類似於 Map Reduce、Hive、Spark Batch 等,對作業進行分析、處理、生成離線報表

- Streaming Analytics 流式計算,顧名思義,就是對資料流進行處理使用流式分析引擎如 Storm,Flink 實時處理分析資料,應用較多的場景如實時大屏、實時報表。

 

 

 

1.3  流批一體API

DataStream API 支援批執行模式

Flink 的核心 API 最初是針對特定的場景設計的,儘管 Table API / SQL 針對流處理和批處理已經實現了統一的 API,但當用戶使用較底層的 API 時,仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進行選擇。鑑於批處理是流處理的一種特例,將這兩種 API 合併成統一的 API,有一些非常明顯的好處,比如:

 

ü 可複用性:作業可以在流和批這兩種執行模式之間自由地切換,而無需重寫任何程式碼。因此,使用者可以複用同一個作業,來處理實時資料和歷史資料。

ü 維護簡單:統一的 API 意味著流和批可以共用同一組 connector,維護同一套程式碼,並能夠輕鬆地實現流批混合執行,例如 backfilling 之類的場景。

 

考慮到這些優點,社群已朝著流批統一的 DataStream API 邁出了第一步:支援高效的批處理(FLIP-134)。從長遠來看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。

 

l API

Flink提供了多個層次的API供開發者使用,越往上抽象程度越高,使用起來越方便;越往下越底層,使用起來難度越大

 

 

 

 

 

注意:在Flink1.12時支援流批一體,DataSetAPI已經不推薦使用了,所以課程中除了個別案例使用DataSet外,後續其他案例都會優先使用DataStream流式API,既支援無界資料處理/流處理,也支援有界資料處理/批處理!當然Table&SQL-API會單獨學習

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/

https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

 

 

 

 

 

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

 

 

l 程式設計模型

Flink 應用程式結構主要包含三部分,Source/Transformation/Sink,如下圖所示:

 

 

 

 

2.  Source

 

 

 

2.1  預定義Source

2.1.1  基於集合的Source

l API

一般用於學習測試時編造資料時使用

1.env.fromElements(可變引數);

2.env.fromColletion(各種集合);

3.env.generateSequence(開始,結束);

4.env.fromSequence(開始,結束);

 

程式碼演示:

package cn.itcast.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Author itcast
 * Desc
 * 把本地的普通的Java集合/Scala集合變為分散式的FlinkDataStream集合!
 * 一般用於學習測試時編造資料時使用
 * 1.env.fromElements(可變引數);
 * 2.env.fromColletion(各種集合);
 * 3.env.generateSequence(開始,結束);
 * 4.env.fromSequence(開始,結束);
 */
public class SourceDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        // * 1.env.fromElements(可變引數);
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        // * 2.env.fromColletion(各種集合);
        DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));
        // * 3.env.generateSequence(開始,結束);
        DataStream<Long> ds3 = env.generateSequence(1, 10);
        //* 4.env.fromSequence(開始,結束);
        DataStream<Long> ds4 = env.fromSequence(1, 10);
        //3.Transformation
        //4.sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();
        //5.execute
        env.execute();
    }
}

 

2.1.2  基於檔案的Source

l API

一般用於學習測試

env.readTextFile(本地/HDFS檔案/資料夾);//壓縮檔案也可以

 

程式碼演示:

package cn.itcast.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 * 1.env.readTextFile(本地/HDFS檔案/資料夾);//壓縮檔案也可以
 */
public class SourceDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        // * 1.env.readTextFile(本地檔案/HDFS檔案);//壓縮檔案也可以
        DataStream<String> ds1 = env.readTextFile("data/input/words.txt");
        DataStream<String> ds2 = env.readTextFile("data/input/dir");
        DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");
        DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");
        //3.Transformation
        //4.sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();
        //5.execute
        env.execute();
    }
}

 

 

2.1.3  基於Socket的Source

一般用於學習測試

需求:

1.在node1上使用nc -lk 9999 向指定埠傳送資料

nc是netcat的簡稱,原本是用來設定路由器,我們可以利用它向某個埠傳送資料

如果沒有該命令可以下安裝

yum install -y nc

 

2.使用Flink編寫流處理應用程式實時統計單詞數量

 

程式碼實現:

package cn.itcast.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 * SocketSource
 */
public class SourceDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.處理資料-transformation
        //3.1每一行資料按照空格切分成一個個的單片語成一個集合
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的資料
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//將切割處理的一個個的單詞收集起來並返回
                }
            }
        });
        //3.2對集合中的每個單詞記為1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是進來一個個的單詞
                return Tuple2.of(value, 1);
            }
        });

        //3.3對資料按照單詞(key)進行分組
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4對各個組內的資料按照數量(value)進行聚合就是求sum
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.輸出結果-sink
        result.print();

        //5.觸發執行-execute
        env.execute();
    }
}

 

2.2  自定義Source

2.2.1  隨機生成資料

l API

一般用於學習測試,模擬生成一些資料

Flink還提供了資料來源介面,我們實現該介面就可以實現自定義資料來源,不同的介面有不同的功能,分類如下:

SourceFunction:非並行資料來源(並行度只能=1)

RichSourceFunction:多功能非並行資料來源(並行度只能=1)

ParallelSourceFunction:並行資料來源(並行度能夠>=1)

RichParallelSourceFunction:多功能並行資料來源(並行度能夠>=1)--後續學習的Kafka資料來源使用的就是該介面

l 需求

每隔1秒隨機生成一條訂單資訊(訂單ID、使用者ID、訂單金額、時間戳)

要求:

- 隨機生成訂單ID(UUID)

- 隨機生成使用者ID(0-2)

- 隨機生成訂單金額(0-100)

- 時間戳為當前系統時間

 

l 程式碼實現

package cn.itcast.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * Author itcast
 * Desc
 *需求
 * 每隔1秒隨機生成一條訂單資訊(訂單ID、使用者ID、訂單金額、時間戳)
 * 要求:
 * - 隨機生成訂單ID(UUID)
 * - 隨機生成使用者ID(0-2)
 * - 隨機生成訂單金額(0-100)
 * - 時間戳為當前系統時間
 *
 * API
 * 一般用於學習測試,模擬生成一些資料
 * Flink還提供了資料來源介面,我們實現該介面就可以實現自定義資料來源,不同的介面有不同的功能,分類如下:
 * SourceFunction:非並行資料來源(並行度只能=1)
 * RichSourceFunction:多功能非並行資料來源(並行度只能=1)
 * ParallelSourceFunction:並行資料來源(並行度能夠>=1)
 * RichParallelSourceFunction:多功能並行資料來源(並行度能夠>=1)--後續學習的Kafka資料來源使用的就是該介面
 */
public class SourceDemo04_Customer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.Source
        DataStream<Order> orderDS = env
                .addSource(new MyOrderSource())
                .setParallelism(2);

        //3.Transformation

        //4.Sink
        orderDS.print();
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private String id;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
    public static class MyOrderSource extends RichParallelSourceFunction<Order> {
        private Boolean flag = true;
        @Override
        public void run(SourceContext<Order> ctx) throws Exception {
            Random random = new Random();
            while (flag){
                Thread.sleep(1000);
                String id = UUID.randomUUID().toString();
                int userId = random.nextInt(3);
                int money = random.nextInt(101);
                long createTime = System.currentTimeMillis();
                ctx.collect(new Order(id,userId,money,createTime));
            }
        }
        //取消任務/執行cancle命令的時候執行
        @Override
        public void cancel() {
            flag = false;
        }
    }
}

 

 

2.2.2  MySQL

需求:

實際開發中,經常會實時接收一些資料,要和MySQL中儲存的一些規則進行匹配,那麼這時候就可以使用Flink自定義資料來源從MySQL中讀取資料

那麼現在先完成一個簡單的需求:

MySQL中實時載入資料

要求MySQL中的資料有變化,也能被實時加載出來

 

l 準備資料

CREATE TABLE `t_student` (

    `id` int(11) NOT NULL AUTO_INCREMENT,

    `name` varchar(255) DEFAULT NULL,

    `age` int(11) DEFAULT NULL,

    PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

 

INSERT INTO `t_student` VALUES ('1', 'jack', '18');

INSERT INTO `t_student` VALUES ('2', 'tom', '19');

INSERT INTO `t_student` VALUES ('3', 'rose', '20');

INSERT INTO `t_student` VALUES ('4', 'tom', '19');

INSERT INTO `t_student` VALUES ('5', 'jack', '18');

INSERT INTO `t_student` VALUES ('6', 'rose', '20');

 

 

程式碼實現:

package cn.itcast.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Desc
 * 需求:
 * 實際開發中,經常會實時接收一些資料,要和MySQL中儲存的一些規則進行匹配,那麼這時候就可以使用Flink自定義資料來源從MySQL中讀取資料
 * 那麼現在先完成一個簡單的需求:
 * MySQL中實時載入資料
 * 要求MySQL中的資料有變化,也能被實時加載出來
 */
public class SourceDemo05_Customer_MySQL {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);

        //3.Transformation
        //4.Sink
        studentDS.print();

        //5.execute
        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }

    public static class MySQLSource extends RichParallelSourceFunction<Student> {
        private Connection conn = null;
        private PreparedStatement ps = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            //載入驅動,開啟連線
            //Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
            String sql = "select id,name,age from t_student";
            ps = conn.prepareStatement(sql);
        }

        private boolean flag = true;

        @Override
        public void run(SourceContext<Student> ctx) throws Exception {
            while (flag) {
                ResultSet rs = ps.executeQuery();
                while (rs.next()) {
                    int id = rs.getInt("id");
                    String name = rs.getString("name");
                    int age = rs.getInt("age");
                    ctx.collect(new Student(id, name, age));
                }
                TimeUnit.SECONDS.sleep(5);
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
        }
    }
}

 

 

 

 

 

3.  Transformation

3.1  官網API列表

 

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/

 

 

 

 

整體來說,流式資料上的操作可以分為四類。

l第一類是對於單條記錄的操作,比如篩除掉不符合要求的記錄(Filter 操作),或者將每條記錄都做一個轉換(Map 操作)

l第二類是對多條記錄的操作。比如說統計一個小時內的訂單總成交量,就需要將一個小時內的所有訂單記錄的成交量加到一起。為了支援這種型別的操作,就得通過 Window 將需要的記錄關聯到一起進行處理

l第三類是對多個流進行操作並轉換為單個流。例如,多個流可以通過 Union、Join 或 Connect 等操作合到一起。這些操作合併的邏輯不同,但是它們最終都會產生了一個新的統一的流,從而可以進行一些跨流的操作。

l最後, DataStream 還支援與合併對稱的拆分操作,即把一個流按一定規則拆分為多個流(Split 操作),每個流是之前流的一個子集,這樣我們就可以對不同的流作不同的處理。

3.2  基本操作-略

3.2.1  map

l API

map:將函式作用在集合中的每一個元素上,並返回作用後的結果

 

 

 

 

 

3.2.2  flatMap

l API

flatMap:將集合中的每個元素變成一個或多個元素,並返回扁平化之後的結果

 

 

 

 

 

 

 

3.2.3  keyBy

按照指定的key來對流中的資料進行分組,前面入門案例中已經演示過

注意:

流處理中沒有groupBy,而是keyBy

 

 

 

 

 

3.2.4  filter

l API

filter:按照指定的條件對集合中的元素進行過濾,過濾出返回true/符合條件的元素

 

 

 

 

 

3.2.5  sum

l API

sum:按照指定的欄位對集合中的元素進行求和

 

 

 

3.2.6  reduce

l API

reduce:對集合中的元素進行聚合

 

 

 

 

3.2.7  程式碼演示

 

需求:

對流資料中的單詞進行統計,排除敏感詞heihei

 

l 程式碼演示

package cn.itcast.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 */
public class TransformationDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.處理資料-transformation
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的資料
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//將切割處理的一個個的單詞收集起來並返回
                }
            }
        });
        DataStream<String> filtedDS = wordsDS.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.equals("heihei");
            }
        });
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = filtedDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是進來一個個的單詞
                return Tuple2.of(value, 1);
            }
        });
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);

        DataStream<Tuple2<String, Integer>> result1 = groupedDS.sum(1);
        DataStream<Tuple2<String, Integer>> result2 = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value1.f1);
            }
        });

        //4.輸出結果-sink
        result1.print("result1");
        result2.print("result2");

        //5.觸發執行-execute
        env.execute();
    }
}

 

 

3.3  合併-拆分

 

3.3.1  union和connect

l API

union:

union運算元可以合併多個同類型的資料流,並生成同類型的資料流,即可以將多個DataStream[T]合併為一個新的DataStream[T]。資料將按照先進先出(First In First Out)的模式合併,且不去重。

 

 

 

 

connect:

connect提供了和union類似的功能,用來連線兩個資料流,它與union的區別在於:

connect只能連線兩個資料流,union可以連線多個數據流。

connect所連線的兩個資料流的資料型別可以不一致,union所連線的兩個資料流的資料型別必須一致。

兩個DataStream經過connect之後被轉化為ConnectedStreams,ConnectedStreams會對兩個流的資料應用不同的處理方法,且雙流之間可以共享狀態。

 

 

 

l 需求

將兩個String型別的流進行union

將一個String型別和一個Long型別的流進行connect

 

l 程式碼實現

package cn.itcast.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * Author itcast
 * Desc
 */
public class TransformationDemo02 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.Source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
        DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);

        //3.Transformation
        DataStream<String> result1 = ds1.union(ds2);//合併但不去重 https://blog.csdn.net/valada/article/details/104367378
        ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);
        //interface CoMapFunction<IN1, IN2, OUT>
        DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {
            @Override
            public String map1(String value) throws Exception {
                return "String->String:" + value;
            }

            @Override
            public String map2(Long value) throws Exception {
                return "Long->String:" + value.toString();
            }
        });

        //4.Sink
        result1.print();
        result2.print();

        //5.execute
        env.execute();
    }
}

 

3.3.2  split、select和Side Outputs

l API

Split就是將一個流分成多個流

Select就是獲取分流後對應的資料

注意:split函式已過期並移除

 

Side Outputs:可以使用process方法對流中資料進行處理,並針對不同的處理結果將資料收集到不同的OutputTag中

 

需求:

對流中的資料按照奇數和偶數進行分流,並獲取分流後的資料

 

程式碼實現:

package cn.itcast.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * Author itcast
 * Desc
 */
public class TransformationDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.Source
        DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        //3.Transformation
        /*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                //value是進來的數字
                if (value % 2 == 0) {
                    //偶數
                    ArrayList<String> list = new ArrayList<>();
                    list.add("偶數");
                    return list;
                } else {
                    //奇數
                    ArrayList<String> list = new ArrayList<>();
                    list.add("奇數");
                    return list;
                }
            }
        });
        DataStream<Integer> evenResult = splitResult.select("偶數");
        DataStream<Integer> oddResult = splitResult.select("奇數");*/

        //定義兩個輸出標籤
        OutputTag<Integer> tag_even = new OutputTag<Integer>("偶數", TypeInformation.of(Integer.class));
        OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇數"){};
        //ds中的資料進行處理
        SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 2 == 0) {
                    //偶數
                    ctx.output(tag_even, value);
                } else {
                    //奇數
                    ctx.output(tag_odd, value);
                }
            }
        });

        //取出標記好的資料
        DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);
        DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);

        //4.Sink
        evenResult.print("偶數");
        oddResult.print("奇數");

        //5.execute
        env.execute();
    }
}

 

 

 

3.4  分割槽

3.4.1  rebalance重平衡分割槽

l API

類似於Spark中的repartition,但是功能更強大,可以直接解決資料傾斜

Flink也有資料傾斜的時候,比如當前有資料量大概10億條資料需要處理,在處理過程中可能會發生如圖所示的狀況,出現了資料傾斜,其他3臺機器執行完畢也要等待機器1執行完畢後才算整體將任務完成;

 

 

 

 

所以在實際的工作中,出現這種情況比較好的解決方案就是rebalance(內部使用round robin方法將資料均勻打散)

 

 

 

 

程式碼演示:

package cn.itcast.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 */
public class TransformationDemo04 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism(3);

        //2.source
        DataStream<Long> longDS = env.fromSequence(0, 100);

        //3.Transformation
        //下面的操作相當於將資料隨機分配一下,有可能出現數據傾斜
        DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long num) throws Exception {
                return num > 10;
            }
        });

        //接下來使用map操作,將資料轉為(分割槽編號/子任務編號, 資料)
        //Rich表示多功能的,MapFunction要多一些API可以供我們使用
        DataStream<Tuple2<Integer, Integer>> result1 = filterDS
                .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Long value) throws Exception {
                        //獲取分割槽編號/子任務編號
                        int id = getRuntimeContext().getIndexOfThisSubtask();
                        return Tuple2.of(id, 1);
                    }
                }).keyBy(t -> t.f0).sum(1);

        DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
                .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Long value) throws Exception {
                        //獲取分割槽編號/子任務編號
                        int id = getRuntimeContext().getIndexOfThisSubtask();
                        return Tuple2.of(id, 1);
                    }
                }).keyBy(t -> t.f0).sum(1);

        //4.sink
        //result1.print();//有可能出現數據傾斜
        result2.print();//在輸出前進行了rebalance重分割槽平衡,解決了資料傾斜

        //5.execute
        env.execute();
    }
}

 

3.4.2  其他分割槽

l API

 

 

 

說明:

recale分割槽。基於上下游Operator的並行度,將記錄以迴圈的方式輸出到下游Operator的每個例項。

舉例:

上游並行度是2,下游是4,則上游一個並行度以迴圈的方式將記錄輸出到下游的兩個並行度上;上游另一個並行度以迴圈的方式將記錄輸出到下游另兩個並行度上。若上游並行度是4,下游並行度是2,則上游兩個並行度將記錄輸出到下游一個並行度上;上游另兩個並行度將記錄輸出到下游另一個並行度上。

 

需求:

對流中的元素使用各種分割槽,並輸出

 

l 程式碼實現

package cn.itcast.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 */
public class TransformationDemo05 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2.Source
        DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        //3.Transformation
        DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
        DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
        DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
        DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
        DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
        DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
        DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {
            @Override
            public int partition(String key, int numPartitions) {
                return key.equals("hello") ? 0 : 1;
            }
        }, t -> t.f0);

        //4.sink
        //result1.print();
        //result2.print();
        //result3.print();
        //result4.print();
        //result5.print();
        //result6.print();
        result7.print();

        //5.execute
        env.execute();
    }
}

 

 

4.  Sink

 

4.1  預定義Sink

4.1.1  基於控制檯和檔案的Sink

l API

1.ds.print 直接輸出到控制檯

2.ds.printToErr() 直接輸出到控制檯,用紅色

3.ds.writeAsText("本地/HDFS的path",WriteMode.OVERWRITE).setParallelism(1)

 

注意:

在輸出到path的時候,可以在前面設定並行度,如果

並行度>1,則path為目錄

並行度=1,則path為檔名

 

程式碼演示:

package cn.itcast.sink;

import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 * 1.ds.print 直接輸出到控制檯
 * 2.ds.printToErr() 直接輸出到控制檯,用紅色
 * 3.ds.collect 將分散式資料收集為本地集合
 * 4.ds.setParallelism(1).writeAsText("本地/HDFSpath",WriteMode.OVERWRITE)
 */
public class SinkDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.source
        //DataStream<String> ds = env.fromElements("hadoop", "flink");
        DataStream<String> ds = env.readTextFile("data/input/words.txt");

        //3.transformation
        //4.sink
        ds.print();
        ds.printToErr();
        ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);
        //注意:
        //Parallelism=1為檔案
        //Parallelism>1為資料夾

        //5.execute
        env.execute();
    }
}

 

 

4.2  自定義Sink

4.2.1  MySQL

需求:

Flink集合中的資料通過自定義Sink儲存到MySQL

 

程式碼實現:

package cn.itcast.sink;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Author itcast
 * Desc
 * 使用自定義sink將資料儲存到MySQL
 */
public class SinkDemo02_MySQL {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18));
        //3.Transformation
        //4.Sink
        studentDS.addSink(new MySQLSink());

        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }

    public static class MySQLSink extends RichSinkFunction<Student> {
        private Connection conn = null;
        private PreparedStatement ps = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            //載入驅動,開啟連線
            //Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");
            String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)";
            ps = conn.prepareStatement(sql);
        }

        @Override
        public void invoke(Student value, Context context) throws Exception {
            //ps中的?設定具體值
            ps.setString(1,value.getName());
            ps.setInt(2,value.getAge());
            //執行sql
            ps.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
        }
    }
}

 

 

5.  Connectors

5.1  JDBC

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

 

package cn.itcast.connectors;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 */
public class ConnectorsDemo_JDBC {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/bigdata")
                                .withUsername("root")
                                .withPassword("root")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

 

5.2  Kafka

5.2.1  pom依賴

Flink 裡已經提供了一些繫結的 Connector,例如 kafka source 和 sink,Es sink 等。讀寫 kafka、es、rabbitMQ 時可以直接使用相應 connector 的 api 即可,雖然該部分是 Flink 專案原始碼裡的一部分,但是真正意義上不算作 Flink 引擎相關邏輯,並且該部分沒有打包在二進位制的釋出包裡面。所以在提交 Job 時候需要注意, job 程式碼 jar 包中一定要將相應的 connetor 相關類打包進去,否則在提交作業時就會失敗,提示找不到相應的類,或初始化某些類異常。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

 

5.2.2  引數設定

 

以下引數都必須/建議設定上

1.訂閱的主題

2.反序列化規則

3.消費者屬性-叢集地址

4.消費者屬性-消費者組id(如果不設定,會有預設的,但是預設的不方便管理)

5.消費者屬性-offset重置規則,如earliest/latest...

6.動態分割槽檢測(當kafka的分割槽數變化/增加時,Flink能夠檢測到!)

7.如果沒有設定Checkpoint,那麼可以設定自動提交offset,後續學習了Checkpoint會把offset隨著做Checkpoint的時候提交到Checkpoint和預設主題中

 

 

5.2.3  引數說明

 

 

 

 

 

 

 

 

 

實際的生產環境中可能有這樣一些需求,比如:

l場景一:有一個 Flink 作業需要將五份資料聚合到一起,五份資料對應五個 kafka topic,隨著業務增長,新增一類資料,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。

l場景二:作業從一個固定的 kafka topic 讀資料,開始該 topic 有 10 個 partition,但隨著業務的增長資料量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition?

針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設定 flink.partition-discovery.interval-millis 引數為非負值,表示開啟動態發現的開關,以及設定的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的執行緒定期去 kafka 獲取最新的 meta 資訊。

l針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表示式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。

l針對場景二,設定前面的動態發現引數,在定期獲取 kafka 最新 meta 資訊時會匹配新的 partition。為了保證資料的正確性,新發現的 partition 從最早的位置開始讀取。

 

 

 

注意:

開啟 checkpoint 時 offset 是 Flink 通過狀態 state 管理和恢復的,並不是從 kafka 的 offset 位置恢復。在 checkpoint 機制下,作業從最近一次checkpoint 恢復,本身是會回放部分歷史資料,導致部分資料重複消費,Flink 引擎僅保證計算狀態的精準一次,要想做到端到端精準一次需要依賴一些冪等的儲存系統或者事務操作。

 

5.2.4  Kafka命令

  ● 檢視當前伺服器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181

  ● 建立topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka

  ● 檢視某個Topic的詳情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

  ● 刪除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

  ● 通過shell命令傳送訊息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka

  ● 通過shell消費訊息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning

  ● 修改分割槽

 /export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181

 

5.2.5  程式碼實現-Kafka Consumer

package cn.itcast.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Author itcast
 * Desc
 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消費Kafka中的資料做WordCount
 * 需要設定如下引數:
 * 1.訂閱的主題
 * 2.反序列化規則
 * 3.消費者屬性-叢集地址
 * 4.消費者屬性-消費者組id(如果不設定,會有預設的,但是預設的不方便管理)
 * 5.消費者屬性-offset重置規則,earliest/latest...
 * 6.動態分割槽檢測(kafka的分割槽數變化/增加時,Flink能夠檢測到!)
 * 7.如果沒有設定Checkpoint,那麼可以設定自動提交offset,後續學習了Checkpoint會把offset隨著做Checkpoint的時候提交到Checkpoint和預設主題中
 */
public class ConnectorsDemo_KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset","latest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//會開啟一個後臺執行緒每隔5s檢測一下Kafka的分割槽情況
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//設定從記錄的offset開始消費,如果沒有記錄從auto.offset.reset配置開始消費
        //kafkaSource.setStartFromEarliest();//設定直接從Earliest消費,auto.offset.reset配置無關
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //3.Transformation
        //3.1切割並記為1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分組
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }
}

 

 

5.2.6  程式碼實現-Kafka Producer

需求:

Flink集合中的資料通過自定義Sink儲存到Kafka

 

l 程式碼實現

package cn.itcast.connectors;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * Author itcast
 * Desc
 * 使用自定義sink-官方提供的flink-connector-kafka_2.12-將資料儲存到Kafka
 */
public class ConnectorsDemo_KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前來說我們使用Kafka使用的序列化和反序列化都是直接使用最簡單的字串,所以先將Student轉為字串
        //可以直接呼叫StudenttoString,也可以轉為JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });

        //4.Sink
        jsonDS.print();
        //根據引數建立KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka",  new SimpleStringSchema(),  props);
        jsonDS.addSink(kafkaSink);

        //5.execute
        env.execute();

        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

 

5.3  Redis

l API

通過flink 操作redis 其實我們可以通過傳統的redis 連線池Jpoools 進行redis 的相關操作,但是flink 提供了專門操作redis 的RedisSink,使用起來更方便,而且不用我們考慮效能的問題,接下來將主要介紹RedisSink 如何使用。

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

 

RedisSink 核心類是RedisMapper 是一個介面,使用時我們要編寫自己的redis 操作類實現這個介面中的三個方法,如下所示

1.getCommandDescription() :

設定使用的redis 資料結構型別,和key 的名稱,通過RedisCommand 設定資料結構型別

2.String getKeyFromData(T data):

設定value 中的鍵值對key的值

3.String getValueFromData(T data);

設定value 中的鍵值對value的值

 

使用RedisCommand設定資料結構型別時和redis結構對應關係

Data Type

Redis Command [Sink]

HASH

HSET

LIST

RPUSH, LPUSH

SET

SADD

PUBSUB

PUBLISH

STRING

SET

HYPER_LOG_LOG

PFADD

SORTED_SET

ZADD

SORTED_SET

ZREM

 

l 需求

Flink集合中的資料通過自定義Sink儲存到Redis

 

l 程式碼實現

package cn.itcast.connectors;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 * 需求:
 * 接收訊息並做WordCount,
 * 最後將結果儲存到Redis
 * 注意:儲存到Redis的資料結構:使用hash也就是map
 * key            value
 * WordCount    (單詞,數量)
 */
public class ConnectorsDemo_Redis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //3.1切割並記為1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分組
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.Sink
        result.print();
        // * 最後將結果儲存到Redis
        // * 注意:儲存到Redis的資料結構:使用hash也就是map
        // * key            value
        // * WordCount      (單詞,數量)

        //-1.建立RedisSink之前需要建立RedisConfig
        //連線單機版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //連線叢集版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //連線哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();

        //-3.建立並使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));

        //5.execute
        env.execute();
    }

    /**
     * -2.定義一個Mapper用來指定儲存到Redis中的資料結構
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

 

 

6.  擴充套件閱讀:其他批處理API

6.1  累加器

l API

Flink累加器:

Flink中的累加器,與Mapreduce counter的應用場景類似可以很好地觀察task在執行期間的資料變化,如Flink job任務中的運算元函式中操作累加器,在任務執行結束之後才能獲得累加器的最終結果。

Flink有以下內建累加器每個累加器都實現了Accumulator介面。

IntCounter

LongCounter

DoubleCounter

 

編碼步驟:

1.建立累加器

private IntCounter numLines = new IntCounter();

2.註冊累加器

getRuntimeContext().addAccumulator("num-lines", this.numLines);

3.使用累加器

this.numLines.add(1);

4.獲取累加器的結果

myJobExecutionResult.getAccumulatorResult("num-lines")

 

程式碼實現:

package cn.itcast.batch;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;

/**
 * Author itcast
 * Desc 演示Flink累加器,統計處理的資料條數
 */
public class OtherAPI_Accumulator {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        DataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd");

        //3.Transformation
        MapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() {
            //-1.建立累加器
            private IntCounter elementCounter = new IntCounter();
            Integer count = 0;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //-2註冊累加器
                getRuntimeContext().addAccumulator("elementCounter", elementCounter);
            }

            @Override
            public String map(String value) throws Exception {
                //-3.使用累加器
                this.elementCounter.add(1);
                count+=1;
                System.out.println("不使用累加器統計的結果:"+count);
                return value;
            }
        }).setParallelism(2);

        //4.Sink
        result.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);

        //5.execute
        //-4.獲取加強結果
        JobExecutionResult jobResult = env.execute();
        int nums = jobResult.getAccumulatorResult("elementCounter");
        System.out.println("使用累加器統計的結果:"+nums);
    }
}

 

 

 

6.2  廣播變數

l API

Flink支援廣播。可以將資料廣播到TaskManager上就可以供TaskManager中的SubTask/task去使用,資料儲存到記憶體中。這樣可以減少大量的shuffle操作,而不需要多次傳遞給叢集節點;

比如在資料join階段,不可避免的就是大量的shuffle操作,我們可以把其中一個dataSet廣播出去,一直載入到taskManager的記憶體中,可以直接在記憶體中拿資料,避免了大量的shuffle,導致叢集效能下降;

 

l 圖解:

- 可以理解廣播就是一個公共的共享變數

- 將一個數據集廣播後,不同的Task都可以在節點上獲取到

- 每個節點只存一份

- 如果不使用廣播,每一個Task都會拷貝一份資料集,造成記憶體資源浪費

 

 

 

 

l 注意:

廣播變數是要把dataset廣播到記憶體中,所以廣播的資料量不能太大,否則會出現OOM

廣播變數的值不可修改,這樣才能確保每個節點獲取到的值都是一致的

 

編碼步驟:

1:廣播資料

.withBroadcastSet(DataSet, "name");

2:獲取廣播的資料

Collection<> broadcastSet = getRuntimeContext().getBroadcastVariable("name");

3:使用廣播資料

 

需求:

studentDS(學號,姓名)集合廣播出去(廣播到各個TaskManager記憶體)

然後使用scoreDS(學號,學科,成績)和廣播資料(學號,姓名)進行關聯,得到這樣格式的資料:(姓名,學科,成績)

 

程式碼實現:

package cn.itcast.batch;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 演示Flink廣播變數
 * 程式設計步驟
 * 1:廣播資料
 * .withBroadcastSet(DataSet, "name");
 * 2:獲取廣播的資料
 * Collection<> broadcastSet =     getRuntimeContext().getBroadcastVariable("name");
 * 3:使用廣播資料
 * <p>
 * 需求:
 * studentDS(學號,姓名)集合廣播出去(廣播到各個TaskManager記憶體中)
 * 然後使用scoreDS(學號,學科,成績)和廣播資料(學號,姓名)進行關聯,得到這樣格式的資料:(姓名,學科,成績)
 */
public class OtherAPI_Broadcast {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        //學生資料集(學號,姓名)
        DataSource<Tuple2<Integer, String>> studentDS = env.fromCollection(
                Arrays.asList(Tuple2.of(1, "張三"), Tuple2.of(2, "李四"), Tuple2.of(3, "王五"))
        );

        //成績資料集(學號,學科,成績)
        DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
                Arrays.asList(Tuple3.of(1, "語文", 50), Tuple3.of(2, "數學", 70), Tuple3.of(3, "英文", 86))
        );

        //3.Transformation
        //studentDS(學號,姓名)集合廣播出去(廣播到各個TaskManager記憶體中)
        //然後使用scoreDS(學號,學科,成績)和廣播資料(學號,姓名)進行關聯,得到這樣格式的資料:(姓名,學科,成績)
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(
                new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
                    //定義一集合用來儲存(學號,姓名)
                    Map<Integer, String> studentMap = new HashMap<>();

                    //open方法一般用來初始化資源,每個subtask任務只被呼叫一次
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //-2.獲取廣播資料
                        List<Tuple2<Integer, String>> studentList = getRuntimeContext().getBroadcastVariable("studentInfo");
                        for (Tuple2<Integer, String> tuple : studentList) {
                            studentMap.put(tuple.f0, tuple.f1);
                        }
                        //studentMap = studentList.stream().collect(Collectors.toMap(t -> t.f0, t -> t.f1));
                    }

                    @Override
                    public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                        //-3.使用廣播資料
                        Integer stuID = value.f0;
                        String stuName = studentMap.getOrDefault(stuID, "");
                        //返回(姓名,學科,成績)
                        return Tuple3.of(stuName, value.f1, value.f2);
                    }
                    //-1.廣播資料到各個TaskManager
                }).withBroadcastSet(studentDS, "studentInfo");

        //4.Sink
        result.print();
    }
}

 

 

6.3  分散式快取

l API解釋

Flink提供了一個類似於Hadoop的分散式快取,讓並行執行例項的函式可以在本地訪問。

這個功能可以被使用來分享外部靜態的資料,例如:機器學習的邏輯迴歸模型等

 

l 注意

廣播變數是將變數分發到各個TaskManager節點的記憶體上,分散式快取是將檔案快取到各個TaskManager節點上;

 

編碼步驟:

    1:註冊一個分散式快取檔案

     env.registerCachedFile("hdfs:///path/file", "cachefilename")  

    2:訪問分散式快取檔案中的資料

     File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");

    3:使用

 

l 需求

scoreDS(學號, 學科, 成績)中的資料和分散式快取中的資料(學號,姓名)關聯,得到這樣格式的資料: (學生姓名,學科,成績)

 

程式碼實現:

package cn.itcast.batch;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 演示Flink分散式快取
 * 編碼步驟:
 * 1:註冊一個分散式快取檔案
 *  env.registerCachedFile("hdfs:///path/file", "cachefilename")
 * 2:訪問分散式快取檔案中的資料
 *  File myFile = getRuntimeContext().getDistributedCache().getFile("cachefilename");
 * 3:使用
 *
 * 需求:
 * scoreDS(學號, 學科, 成績)中的資料和分散式快取中的資料(學號,姓名)關聯,得到這樣格式的資料: (學生姓名,學科,成績)
 */
public class OtherAPI_DistributedCache {
    public static void main(String[] args) throws Exception {
        //1.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.Source
        //注意:先將本地資料中的distribute_cache_student檔案上傳到HDFS
        //-1.註冊分散式快取檔案
        //env.registerCachedFile("hdfs://node01:8020/distribute_cache_student", "studentFile");
        env.registerCachedFile("data/input/distribute_cache_student", "studentFile");

        //成績資料集(學號,學科,成績)
        DataSource<Tuple3<Integer, String, Integer>> scoreDS = env.fromCollection(
                Arrays.asList(Tuple3.of(1, "語文", 50), Tuple3.of(2, "數學", 70), Tuple3.of(3, "英文", 86))
        );

        //3.Transformation
        //scoreDS(學號, 學科, 成績)中的資料和分散式快取中的資料(學號,姓名)關聯,得到這樣格式的資料: (學生姓名,學科,成績)
        MapOperator<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>> result = scoreDS.map(
                new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
                    //定義一集合用來儲存(學號,姓名)
                    Map<Integer, String> studentMap = new HashMap<>();

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //-2.載入分散式快取檔案
                        File file = getRuntimeContext().getDistributedCache().getFile("studentFile");
                        List<String> studentList = FileUtils.readLines(file);
                        for (String str : studentList) {
                            String[] arr = str.split(",");
                            studentMap.put(Integer.parseInt(arr[0]), arr[1]);
                        }
                    }

                    @Override
                    public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                        //-3.使用分散式快取檔案中的資料
                        Integer stuID = value.f0;
                        String stuName = studentMap.getOrDefault(stuID, "");
                        //返回(姓名,學科,成績)
                        return Tuple3.of(stuName, value.f1, value.f2);
                    }
                });

        //4.Sink
        result.print();
    }
}

 

 

 

Flink-高階API

課程目標

掌握Window操作

掌握EventTime和Watermaker的使用

掌握State管理

掌握Checkpoint容錯機制

 

 

1.  Flink四大基石

Flink之所以能這麼流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。

 

n Checkpoint

這是Flink最重要的一個特性。

Flink基於Chandy-Lamport演算法實現了一個分散式的一致性的快照,從而提供了一致性的語義。

Chandy-Lamport演算法實際上在1985年的時候已經被提出來,但並沒有被很廣泛的應用,而Flink則把這個演算法發揚光大了。

Spark最近在實現Continue streaming,Continue streaming的目的是為了降低處理的延時,其也需要提供這種一致性的語義,最終也採用了Chandy-Lamport這個演算法,說明Chandy-Lamport演算法在業界得到了一定的肯定。

https://zhuanlan.zhihu.com/p/53482103

n State

提供了一致性的語義之後,Flink為了讓使用者在程式設計時能夠更輕鬆、更容易地去管理狀態,還提供了一套非常簡單明瞭的State API,包括ValueState、ListState、MapState,BroadcastState。

n Time

除此之外,Flink還實現了Watermark的機制,能夠支援基於事件的時間的處理,能夠容忍遲到/亂序的資料。

n Window

另外流計算中一般在對流資料進行操作之前都會先進行開窗,即基於一個什麼樣的視窗上做這個計算。Flink提供了開箱即用的各種視窗,比如滑動視窗、滾動視窗、會話視窗以及非常靈活的自定義的視窗。

 

2.  Flink-Window操作

2.1  為什麼需要Window

在流處理應用中,資料是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。

在這種情況下,我們必須定義一個視窗(window),用來收集最近1分鐘內的資料,並對這個視窗內的資料進行計算。

 

2.2  Window的分類

2.2.1  按照time和count分類

time-window:時間視窗:根據時間劃分視窗,如:每xx分鐘統計最近xx分鐘的資料

count-window:數量視窗:根據數量劃分視窗,如:每xx個數據統計最近xx個數據

 

 

2.2.2  按照slide和size分類

視窗有兩個重要的屬性: 視窗大小size和滑動間隔slide,根據它們的大小關係可分為:

tumbling-window:滾動視窗:size=slide,如:每隔10s統計最近10s的資料

 

sliding-window:滑動視窗:size>slide,如:每隔5s統計最近10s的資料

 

注意:當size<slide的時候,如每隔15s統計最近10s的資料,那麼中間5s的資料會丟失,所有開發中不用

 

2.2.3  總結

按照上面視窗的分類方式進行組合,可以得出如下的視窗:

1.基於時間的滾動視窗tumbling-time-window--用的較多

2.基於時間的滑動視窗sliding-time-window--用的較多

3.基於數量的滾動視窗tumbling-count-window--用的較少

4.基於數量的滑動視窗sliding-count-window--用的較少

注意:Flink還支援一個特殊的視窗:Session會話視窗,需要設定一個會話超時時間,如30s,則表示30s內沒有資料到來,則觸發上個視窗的計算

 

2.3  Window的API

2.3.1  window和windowAll

 

n使用keyby的流,應該使用window方法

n未使用keyby的流,應該呼叫windowAll方法

2.3.2  WindowAssigner

window/windowAll 方法接收的輸入是一個 WindowAssigner, WindowAssigner 負責將每條輸入的資料分發到正確的 window 中,

Flink提供了很多各種場景用的WindowAssigner:

 

如果需要自己定製資料分發策略,則可以實現一個 class,繼承自 WindowAssigner。

 

2.3.3  evictor--瞭解

evictor 主要用於做一些資料的自定義操作,可以在執行使用者程式碼之前,也可以在執行

使用者程式碼之後,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。

Flink 提供瞭如下三種通用的 evictor:

* CountEvictor 保留指定數量的元素

* TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 範圍內的元

素,其中 max_ts 是視窗內時間戳的最大值。

* DeltaEvictor 通過執行使用者給定的 DeltaFunction 以及預設的 theshold,判斷是否刪

除一個元素。

 

2.3.4  trigger--瞭解

trigger 用來判斷一個視窗是否需要被觸發,每個 WindowAssigner 都自帶一個預設的trigger,

如果預設的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的介面以及含義:

* onElement() 每次往 window 增加一個元素的時候都會觸發

* onEventTime() 當 event-time timer 被觸發的時候會呼叫

* onProcessingTime() 當 processing-time timer 被觸發的時候會呼叫

* onMerge() 對兩個 `rigger 的 state 進行 merge 操作

* clear() window 銷燬的時候被呼叫

上面的介面中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選

擇:

* CONTINUE 不做任何事情

* FIRE 觸發 window

* PURGE 清空整個 window 的元素並銷燬視窗

* FIRE_AND_PURGE 觸發視窗,然後銷燬視窗

 

2.3.5  API呼叫示例

 

source.keyBy(0).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));

source.keyBy(0)..timeWindow(Time.seconds(5))

 

 

2.4  案例演示-基於時間的滾動和滑動視窗

2.4.1  需求

nc -lk 9999

有如下資料表示:

訊號燈編號和通過該訊號燈的車的數量

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,4

需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量--基於時間的滾動視窗

需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量--基於時間的滑動視窗

 

2.4.2  程式碼實現

package cn.itcast.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc
 * nc -lk 9999
 * 有如下資料表示:
 * 訊號燈編號和通過該訊號燈的車的數量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
 * 需求1:5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量--基於時間的滾動視窗
 * 需求2:5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量--基於時間的滑動視窗
 */
public class WindowDemo01_TimeWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //9,3轉為CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        //分組
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");

        // * 需求1:5秒鐘統計一次,最近5秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滾動視窗
        //timeWindow(Time size視窗大小, Time slide滑動間隔)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(5))//size==slide,可以只寫一個
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");

        // * 需求2:5秒鐘統計一次,最近10秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滑動視窗
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.timeWindow(Time.seconds(10), Time.seconds(5))
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum("count");

        //4.Sink
/*
1,5
2,5
3,5
4,5
*/
        //result1.print();
        result2.print();

        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

 

 

2.5  案例演示-基於數量的滾動和滑動視窗

2.5.1  需求

需求1:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基於數量的滾動視窗

需求2:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基於數量的滑動視窗

 

2.5.2  程式碼實現

package cn.itcast.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 * nc -lk 9999
 * 有如下資料表示:
 * 訊號燈編號和通過該訊號燈的車的數量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
 * 需求1:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基於數量的滾動視窗
 * 需求2:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基於數量的滑動視窗
 */
public class WindowDemo02_CountWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //9,3轉為CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        //分組
        //KeyedStream<CartInfo, Tuple> keyedDS = cartInfoDS.keyBy("sensorId");

        // * 需求1:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基於數量的滾動視窗
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                //.countWindow(5L, 5L)
                .countWindow( 5L)
                .sum("count");

        // * 需求2:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基於數量的滑動視窗
        //countWindow(long size, long slide)
        SingleOutputStreamOperator<CartInfo> result2 = cartInfoDS
                .keyBy(CartInfo::getSensorId)
                .countWindow(5L, 3L)
                .sum("count");


        //4.Sink
        //result1.print();
        /*
1,1
1,1
1,1
1,1
2,1
1,1
         */
        result2.print();
        /*
1,1
1,1
2,1
1,1
2,1
3,1
4,1
         */

        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

 

2.6  案例演示-會話視窗

2.6.1  需求

設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算

 

2.6.2  程式碼實現

package cn.itcast.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc
 * nc -lk 9999
 * 有如下資料表示:
 * 訊號燈編號和通過該訊號燈的車的數量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
 * 需求:設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算(前提是上一個視窗得有資料!)
 */
public class WindowDemo03_SessionWindow {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<String> socketDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //9,3轉為CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        //需求:設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算(前提是上一個視窗得有資料!)
        SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");

        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

 

3.  Flink-Time與Watermaker

3.1  Time分類

Flink的流式處理中,會涉及到時間的不同概念,如下圖所示:

 

 

事件時間EventTime: 事件真真正正發生產生的時間

攝入時間IngestionTime: 事件到達Flink的時間

處理時間ProcessingTime: 事件真正被處理/計算的時間

 

問題: 上面的三個時間,我們更關注哪一個?

答案: 更關注事件時間 !

因為: 事件時間更能反映事件的本質! 只要事件時間一產生就不會變化

3.2  EventTime的重要性

3.2.1  示例1

假設,你正在去往地下停車場的路上,並且打算用手機點一份外賣。選好了外賣後,你就用線上支付功能付款了,這個時候是11點59分。恰好這時,你走進了地下停車庫,而這裡並沒有手機訊號。因此外賣的線上支付並沒有立刻成功,而支付系統一直在Retry重試“支付”這個操作。

當你找到自己的車並且開出地下停車場的時候,已經是12點01分了。這個時候手機重新有了訊號,手機上的支付資料成功發到了外賣線上支付系統,支付完成。

 

在上面這個場景中你可以看到,

支付資料的事件時間是11點59分,而支付資料的處理時間是12點01分

 

問題:

如果要統計12之前的訂單金額,那麼這筆交易是否應被統計?

答案:

應該被統計,因為該資料的真真正正的產生時間為11點59,即該資料的事件時間為11點59,

事件時間能夠真正反映/代表事件的本質! 所以一般在實際開發中會以事件時間作為計算標準

 

 

3.2.2  示例2

一條錯誤日誌的內容為:

2020-11:11 22:59:00 error NullPointExcep --事件時間

進入Flink的時間為2020-11:11 23:00:00    --攝入時間

到達Window的時間為2020-11:11 23:00:10 --處理時間

問題:

對於業務來說,要統計1h內的故障日誌個數,哪個時間是最有意義的?

答案:

EventTime事件時間,因為bug真真正正產生的時間就是事件時間,只有事件時間才能真正反映/代表事件的本質!

 

3.2.3  示例3

App 會記錄使用者的所有點選行為,並回傳日誌(在網路不好的情況下,先儲存在本地,延後回傳)。

A使用者在 11:01:00  App 進行操作,B使用者在 11:02:00 操作了 App,

但是A使用者的網路不太穩定,回傳日誌延遲了,導致我們在服務端先接受到B使用者的訊息,然後再接受到A使用者的訊息,訊息亂序了。

問題:

如果這個是一個根據使用者操作先後順序,進行搶購的業務,那麼是A使用者成功還是B使用者成功?

答案:

應該算A成功,因為A確實比B操作的早,但是實際中考慮到實現難度,可能直接按B成功算

也就是說,實際開發中希望基於事件時間來處理資料,但因為資料可能因為網路延遲等原因,出現了亂序,按照事件時間處理起來有難度!

 

3.2.4  示例4

在實際環境中,經常會出現,因為網路原因,資料有可能會延遲一會才到達Flink實時處理系統。我們先來設想一下下面這個場景:

原本應該被該視窗計算的資料因為網路延遲等原因晚到了,就有可能丟失了

 

 

3.2.5  總結

實際開發中我們希望基於事件時間來處理資料,但因為資料可能因為網路延遲等原因,出現了亂序或延遲到達,那麼可能處理的結果不是我們想要的甚至出現數據丟失的情況,所以需要一種機制來解決一定程度上的資料亂序或延遲到底的問題!也就是我們接下來要學習的Watermaker水印機制/水位線機制

 

 

3.3  Watermaker水印機制/水位線機制

3.3.1  什麼是Watermaker?

Watermaker就是給資料再額外的加的一個時間列

也就是Watermaker是個時間戳!

 

3.3.2  如何計算Watermaker?

Watermaker = 資料的事件時間  -  最大允許的延遲時間或亂序時間

注意:後面通過原始碼會發現,準確來說:

Watermaker = 當前視窗的最大的事件時間  -  最大允許的延遲時間或亂序時間

這樣可以保證Watermaker水位線會一直上升(變大),不會下降

 

3.3.3  Watermaker有什麼用?

之前的視窗都是按照系統時間來觸發計算的,如: [10:00:00 ~ 10:00:10) 的視窗,

一但系統時間到了10:00:10就會觸發計算,那麼可能會導致延遲到達的資料丟失!

那麼現在有了Watermaker,視窗就可以按照Watermaker來觸發計算!

也就是說Watermaker是用來觸發視窗計算的!

 

 

3.3.4  Watermaker如何觸發視窗計算的?

視窗計算的觸發條件為:

1. 視窗中有資料

2. Watermaker >= 視窗的結束時間

 

因為前面說到

Watermaker = 當前視窗的最大的事件時間  -  最大允許的延遲時間或亂序時間

也就是說只要不斷有資料來,就可以保證Watermaker水位線是會一直上升/變大的,不會下降/減小的

所以最終一定是會觸發視窗計算的

 

注意:

上面的觸發公式進行如下變形:

Watermaker >= 視窗的結束時間

Watermaker = 當前視窗的最大的事件時間  -  最大允許的延遲時間或亂序時間

當前視窗的最大的事件時間  -  最大允許的延遲時間或亂序時間  >= 視窗的結束時間

當前視窗的最大的事件時間  >= 視窗的結束時間 +  最大允許的延遲時間或亂序時間

 

3.3.5  圖解Watermaker

 

 

3.4  Watermaker案例演示

3.4.1  需求

有訂單資料,格式為: (訂單ID,使用者ID,時間戳/事件時間,訂單金額)

要求每隔5s,計算5秒內,每個使用者的訂單總金額

並新增Watermaker來解決一定程度上的資料延遲和資料亂序問題。

 

3.4.2  API

 

注意:一般我們都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

 

 

3.4.3  程式碼實現-1-開發版-掌握

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html

 

package cn.itcast.watermaker;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Desc
 * 模擬實時訂單資料,格式為: (訂單ID,使用者ID,訂單金額,時間戳/事件時間)
 * 要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
 * 並新增Watermaker來解決一定程度上的資料延遲和資料亂序問題。
 */
public class WatermakerDemo01_Develop {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模擬實時訂單資料(資料有延遲和亂序)
        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模擬資料延遲和亂序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));

                    TimeUnit.SECONDS.sleep(1);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        //3.Transformation
        //-告訴Flink要基於事件時間來計算!
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本預設就是EventTime
        //-告訴Flnk資料中的哪一列是事件時間,因為Watermaker = 當前最大的事件時間 - 最大允許的延遲時間或亂序時間
        /*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允許的延遲時間或亂序時間
                    @Override
                    public long extractTimestamp(Order element) {
                        return element.eventTime;
                        //指定事件時間是哪一列,Flink底層會自動計算:
                        //Watermaker = 當前最大的事件時間 - 最大允許的延遲時間或亂序時間
                    }
        });*/
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );

        //程式碼走到這裡,就已經被新增上Watermaker!接下來就可以進行視窗計算了
        //要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
        DataStream<Order> result = watermakerDS
                .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");


        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

 

3.4.4  程式碼實現-2-驗證版-瞭解

package cn.itcast.watermaker;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Desc
 * 模擬實時訂單資料,格式為: (訂單ID,使用者ID,訂單金額,時間戳/事件時間)
 * 要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
 * 並新增Watermaker來解決一定程度上的資料延遲和資料亂序問題。
 */
public class WatermakerDemo02_Check {
    public static void main(String[] args) throws Exception {
        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模擬實時訂單資料(資料有延遲和亂序)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模擬資料延遲和亂序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    System.out.println("傳送的資料為: "+userId + " : " + df.format(eventTime));
                    ctx.collect(new Order(orderId, userId, money, eventTime));
                    TimeUnit.SECONDS.sleep(1);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }
        });

        //3.Transformation
        /*DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );*/

        //開發中直接使用上面的即可
        //學習測試時可以自己實現
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        new WatermarkStrategy<Order>() {
                            @Override
                            public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                                return new WatermarkGenerator<Order>() {
                                    private int userId = 0;
                                    private long eventTime = 0L;
                                    private final long outOfOrdernessMillis = 3000;
                                    private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;

                                    @Override
                                    public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                                        userId = event.userId;
                                        eventTime = event.eventTime;
                                        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
                                    }

                                    @Override
                                    public void onPeriodicEmit(WatermarkOutput output) {
                                        //Watermaker = 當前最大事件時間 - 最大允許的延遲時間或亂序時間
                                        Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);
                                        System.out.println("key:" + userId + ",系統時間:" + df.format(System.currentTimeMillis()) + ",事件時間:" + df.format(eventTime) + ",水印時間:" + df.format(watermark.getTimestamp()));
                                        output.emitWatermark(watermark);
                                    }
                                };
                            }
                        }.withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );


        //程式碼走到這裡,就已經被新增上Watermaker!接下來就可以進行視窗計算了
        //要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
       /* DataStream<Order> result = watermakerDS
                 .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");*/

        //開發中使用上面的程式碼進行業務計算即可
        //學習測試時可以使用下面的程式碼對資料進行更詳細的輸出,如輸出視窗觸發時各個視窗中的資料的事件時間,Watermaker時間
        DataStream<String> result = watermakerDS
                .keyBy(Order::getUserId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //apply中的函式應用在視窗中的資料上
                //WindowFunction<IN, OUT, KEY, W extends Window>
                .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {
                    @Override
                    public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {
                        //準備一個集合用來存放屬於該視窗的資料的事件時間
                        List<String> eventTimeList = new ArrayList<>();
                        for (Order order : input) {
                            Long eventTime = order.eventTime;
                            eventTimeList.add(df.format(eventTime));
                        }
                        String outStr = String.format("key:%s,視窗開始結束:[%s~%s),屬於該視窗的事件時間:%s",
                                key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);
                        out.collect(outStr);
                    }
                });
        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

 

3.5  Allowed Lateness案例演示

3.5.1  需求

有訂單資料,格式為: (訂單ID,使用者ID,時間戳/事件時間,訂單金額)

要求每隔5s,計算5秒內,每個使用者的訂單總金額

並新增Watermaker來解決一定程度上的資料延遲和資料亂序問題。

並使用OutputTag+allowedLateness解決資料丟失問題

3.5.2  API

 

 

package cn.itcast.watermaker;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
 * Author itcast
 * Desc
 * 模擬實時訂單資料,格式為: (訂單ID,使用者ID,訂單金額,時間戳/事件時間)
 * 要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
 * 並新增Watermaker來解決一定程度上的資料延遲和資料亂序問題。
 */
public class WatermakerDemo03_AllowedLateness {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        //模擬實時訂單資料(資料有延遲和亂序)
        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
            private boolean flag = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(3);
                    int money = random.nextInt(100);
                    //模擬資料延遲和亂序!
                    long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
                    ctx.collect(new Order(orderId, userId, money, eventTime));

                    //TimeUnit.SECONDS.sleep(1);
                }
            }
            @Override
            public void cancel() {
                flag = false;
            }
        });


        //3.Transformation
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((event, timestamp) -> event.getEventTime())
                );

        //程式碼走到這裡,就已經被新增上Watermaker!接下來就可以進行視窗計算了
        //要求每隔5s,計算5秒內(基於時間的滾動視窗),每個使用者的訂單總金額
        OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));

        SingleOutputStreamOperator<Order> result = watermakerDS
                .keyBy(Order::getUserId)
                //.timeWindow(Time.seconds(5), Time.seconds(5))
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(5))
                .sideOutputLateData(outputTag)
                .sum("money");

        DataStream<Order> result2 = result.getSideOutput(outputTag);

        //4.Sink
        result.print("正常的資料和遲到不嚴重的資料");
        result2.print("遲到嚴重的資料");

        //5.execute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long eventTime;
    }
}

 

 

 

4.  Flink-狀態管理

4.1  Flink中的有狀態計算

注意:

Flink中已經對需要進行有狀態計算的API,做了封裝,底層已經維護好了狀態!

例如,之前下面程式碼,直接使用即可,不需要像SparkStreaming那樣還得自己寫updateStateByKey

也就是說我們今天學習的State只需要掌握原理,實際開發中一般都是使用Flink底層維護好的狀態或第三方維護好的狀態(如Flink整合Kafka的offset維護底層就是使用的State,但是人家已經寫好了的)

package cn.itcast.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc
 * SocketSource
 */
public class SourceDemo03 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //2.source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.處理資料-transformation
        //3.1每一行資料按照空格切分成一個個的單片語成一個集合
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //value就是一行行的資料
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);//將切割處理的一個個的單詞收集起來並返回
                }
            }
        });
        //3.2對集合中的每個單詞記為1
        DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                //value就是進來一個個的單詞
                return Tuple2.of(value, 1);
            }
        });

        //3.3對資料按照單詞(key)進行分組
        //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
        //3.4對各個組內的資料按照數量(value)進行聚合就是求sum
        DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.輸出結果-sink
        result.print();

        //5.觸發執行-execute
        env.execute();
    }
}

執行 netcat,然後在終端輸入 hello world,執行程式會輸出什麼?

答案很明顯,(hello, 1)和 (word,1)

那麼問題來了,如果再次在終端輸入 hello world,程式會輸入什麼?

答案其實也很明顯,(hello, 2)和(world, 2)。

為什麼 Flink 知道之前已經處理過一次 hello world,這就是 state 發揮作用了,這裡是被稱為 keyed state 儲存了之前需要統計的資料,所以 Flink 知道 hello 和 world 分別出現過一次。

 

4.2  無狀態計算和有狀態計算

4.2.1  無狀態計算

 

不需要考慮歷史資料

相同的輸入得到相同的輸出就是無狀態計算, 如map/flatMap/filter....

 

 

首先舉一個無狀態計算的例子:消費延遲計算。

假設現在有一個訊息佇列,訊息佇列中有一個生產者持續往消費佇列寫入訊息,多個消費者分別從訊息佇列中讀取訊息。

從圖上可以看出,生產者已經寫入 16 條訊息,Offset 停留在 15 ;有 3 個消費者,有的消費快,而有的消費慢。消費快的已經消費了 13 條資料,消費者慢的才消費了 7、8 條資料。

如何實時統計每個消費者落後多少條資料,如圖給出了輸入輸出的示例。可以瞭解到輸入的時間點有一個時間戳,生產者將訊息寫到了某個時間點的位置,每個消費者同一時間點分別讀到了什麼位置。剛才也提到了生產者寫入了 15 條,消費者分別讀取了 10、7、12 條。那麼問題來了,怎麼將生產者、消費者的進度轉換為右側示意圖資訊呢?

consumer 0 落後了 5 條,consumer 1 落後了 8 條,consumer 2 落後了 3 條,根據 Flink 的原理,此處需進行 Map 操作。Map 首先把訊息讀取進來,然後分別相減,即可知道每個 consumer 分別落後了幾條。Map 一直往下發,則會得出最終結果。

大家會發現,在這種模式的計算中,無論這條輸入進來多少次,輸出的結果都是一樣的,因為單條輸入中已經包含了所需的所有資訊。消費落後等於生產者減去消費者。生產者的消費在單條資料中可以得到,消費者的資料也可以在單條資料中得到,所以相同輸入可以得到相同輸出,這就是一個無狀態的計算。

 

4.2.2  有狀態計算

 

需要考慮歷史資料

相同的輸入得到不同的輸出/不一定得到相同的輸出,就是有狀態計算,如:sum/reduce

 

 

以訪問日誌統計量的例子進行說明,比如當前拿到一個 Nginx 訪問日誌,一條日誌表示一個請求,記錄該請求從哪裡來,訪問的哪個地址,需要實時統計每個地址總共被訪問了多少次,也即每個 API 被呼叫了多少次。可以看到下面簡化的輸入和輸出,輸入第一條是在某個時間點請求 GET 了 /api/a;第二條日誌記錄了某個時間點 Post /api/b ;第三條是在某個時間點 GET了一個 /api/a,總共有 3 個 Nginx 日誌。

從這 3 條 Nginx 日誌可以看出,第一條進來輸出 /api/a 被訪問了一次,第二條進來輸出 /api/b 被訪問了一次,緊接著又進來一條訪問 api/a,所以 api/a 被訪問了 2 次。不同的是,兩條 /api/a 的 Nginx 日誌進來的資料是一樣的,但輸出的時候結果可能不同,第一次輸出 count=1 ,第二次輸出 count=2,說明相同輸入可能得到不同輸出。輸出的結果取決於當前請求的 API 地址之前累計被訪問過多少次。第一條過來累計是 0 次,count = 1,第二條過來 API 的訪問已經有一次了,所以 /api/a 訪問累計次數 count=2。單條資料其實僅包含當前這次訪問的資訊,而不包含所有的資訊。要得到這個結果,還需要依賴 API 累計訪問的量,即狀態。

這個計算模式是將資料輸入運算元中,用來進行各種複雜的計算並輸出資料。這個過程中運算元會去訪問之前儲存在裡面的狀態。另外一方面,它還會把現在的資料對狀態的影響實時更新,如果輸入 200 條資料,最後輸出就是 200 條結果。

 

4.3  有狀態計算的場景

 

 

什麼場景會用到狀態呢?下面列舉了常見的 4 種:

1.去重:比如上游的系統資料可能會有重複,落到下游系統時希望把重複的資料都去掉。去重需要先了解哪些資料來過,哪些資料還沒有來,也就是把所有的主鍵都記錄下來,當一條資料到來後,能夠看到在主鍵當中是否存在。

2.視窗計算:比如統計每分鐘 Nginx 日誌 API 被訪問了多少次。視窗是一分鐘計算一次,在視窗觸發前,如 08:00 ~ 08:01 這個視窗,前59秒的資料來了需要先放入記憶體,即需要把這個視窗之內的資料先保留下來,等到 8:01 時一分鐘後,再將整個視窗內觸發的資料輸出。未觸發的視窗資料也是一種狀態。

3.機器學習/深度學習:如訓練的模型以及當前模型的引數也是一種狀態,機器學習可能每次都用有一個數據集,需要在資料集上進行學習,對模型進行一個反饋。

4.訪問歷史資料:比如與昨天的資料進行對比,需要訪問一些歷史資料。如果每次從外部去讀,對資源的消耗可能比較大,所以也希望把這些歷史資料也放入狀態中做對比。

 

 

4.4  狀態的分類

4.4.1  Managed State & Raw State

 

Flink是否接管角度:可以分為

ManagedState(託管狀態)

RawState(原始狀態)

兩者的區別如下:

1. 從狀態管理方式的方式來說,Managed State 由 Flink Runtime 管理,自動儲存,自動恢復,在記憶體管理上有優化;而 Raw State 需要使用者自己管理,需要自己序列化,Flink 不知道 State 中存入的資料是什麼結構,只有使用者自己知道,需要最終序列化為可儲存的資料結構。

2. 從狀態資料結構來說,Managed State 支援已知的資料結構,如 Value、List、Map 等。而 Raw State只支援位元組陣列 ,所有狀態都要轉換為二進位制位元組陣列才可以。

3. 從推薦使用場景來說,Managed State 大多數情況下均可使用,而 Raw State 是當 Managed State 不夠用時,比如需要自定義 Operator 時,才會使用 Raw State。

在實際生產中,都只推薦使用ManagedState後續將圍繞該話題進行討論。

 

4.4.2  Keyed State & Operator State

 

Managed State 分為兩種,Keyed State 和 Operator State

(Raw State都是Operator State)

l Keyed State

 

Flink Stream模型中,Datastream 經過 keyBy 的操作可以變為 KeyedStream。

Keyed State是基於KeyedStream上的狀態。這個狀態是跟特定的key繫結的,對KeyedStream流上的每一個key,都對應一個state,如stream.keyBy(…)

KeyBy之後的State,可以理解為分割槽過的State,每個並行keyed Operator的每個例項的每個key都有一個Keyed State,即<parallel-operator-instance,key>就是一個唯一的狀態,由於每個key屬於一個keyed Operator的並行例項,因此我們將其簡單的理解為<operator,key>

 

 

l Operator State

 

這裡的fromElements會呼叫FromElementsFunction的類,其中就使用了型別為 list state 的 operator state

Operator State又稱為 non-keyed state,Key無關的State,每一個 operator state 都僅與一個 operator 的例項繫結。

Operator State 可以用於所有運算元,但一般常用於 Source

 

 

4.5  儲存State的資料結構/API介紹

前面說過有狀態計算其實就是需要考慮歷史資料

而歷史資料需要搞個地方儲存起來

Flink為了方便不同分類的State的儲存和管理,提供瞭如下的API/資料結構來儲存State!

 

 

 

Keyed State 通過 RuntimeContext 訪問,這需要 Operator 是一個RichFunction。

儲存Keyed state的資料結構:

ValueState<T>:即型別為T的單值狀態。這個狀態與對應的key繫結,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值,如求按使用者id統計使用者交易總額

ListState<T>:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值,如統計按使用者id統計使用者經常登入的Ip

ReducingState<T>:這種狀態通過使用者傳入的reduceFunction,每次呼叫add方法新增值的時候,會呼叫reduceFunction,最後合併到一個單一的狀態值

MapState<UK, UV>:即狀態值為一個map。使用者通過put或putAll方法新增元素

需要注意的是,以上所述的State物件,僅僅用於與狀態進行互動(更新、刪除、清空等),而真正的狀態值,有可能是存在記憶體、磁碟、或者其他分散式儲存系統中。相當於我們只是持有了這個狀態的控制代碼

Operator State 需要自己實現 CheckpointedFunction 或 ListCheckpointed 介面。

儲存Operator state的資料結構:

ListState<T>

BroadcastState<K,V>

舉例來說,Flink中的FlinkKafkaConsumer,就使用了operator state。它會在每個connector例項中,儲存該例項中消費topic的所有(partition, offset)對映

 

 

4.6  State程式碼示例

4.6.1  Keyed State

下圖就 word count 的 sum 所使用的StreamGroupedReduce類為例講解了如何在程式碼中使用 keyed state:

 

 

l 官網程式碼示例

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state

 

需求:

使用KeyState中的ValueState獲取資料中的最大值(實際中直接使用maxBy即可)

 

l 編碼步驟

//-1.定義一個狀態用來存放最大值

private transient ValueState<Long> maxValueState;

//-2.建立一個狀態描述符物件

ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);

//-3.根據狀態描述符獲取State

maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);

 //-4.使用State

Long historyValue = maxValueState.value();

//判斷當前值和歷史值誰大

if (historyValue == null || currentValue > historyValue)

//-5.更新狀態

maxValueState.update(currentValue);     

 

   

l 程式碼示例

package cn.itcast.state;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Author itcast
 * Desc
 * 使用KeyState中的ValueState獲取流資料中的最大值(實際中直接使用maxBy即可)
 */
public class StateDemo01_KeyedState {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);//方便觀察

        //2.Source
        DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
                Tuple2.of("北京", 1L),
                Tuple2.of("上海", 2L),
                Tuple2.of("北京", 6L),
                Tuple2.of("上海", 8L),
                Tuple2.of("北京", 3L),
                Tuple2.of("上海", 4L)
        );

        //3.Transformation
        //使用KeyState中的ValueState獲取流資料中的最大值(實際中直接使用maxBy即可)
        //實現方式1:直接使用maxBy--開發中使用該方式即可
        //min只會求出最小的那個欄位,其他的欄位不管
        //minBy會求出最小的那個欄位和對應的其他的欄位
        //max只會求出最大的那個欄位,其他的欄位不管
        //maxBy會求出最大的那個欄位和對應的其他的欄位
        SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
                .maxBy(1);

        //實現方式2:使用KeyState中的ValueState---學習測試時使用,或者後續專案中/實際開發中遇到複雜的Flink沒有實現的邏輯,才用該方式!
        SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
                .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                    //-1.定義狀態用來儲存最大值
                    private ValueState<Long> maxValueState = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //-2.定義狀態描述符:描述狀態的名稱和裡面的資料型別
                        ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
                        //-3.根據狀態描述符初始化狀態
                        maxValueState = getRuntimeContext().getState(descriptor);
                    }

                    @Override
                    public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                        //-4.使用State,取出State中的最大值/歷史最大值
                        Long historyMaxValue = maxValueState.value();
                        Long currentValue = value.f1;
                        if (historyMaxValue == null || currentValue > historyMaxValue) {
                            //5-更新狀態,把當前的作為新的最大值存到狀態中
                            maxValueState.update(currentValue);
                            return Tuple3.of(value.f0, currentValue, currentValue);
                        } else {
                            return Tuple3.of(value.f0, currentValue, historyMaxValue);
                        }
                    }
                });


        //4.Sink
        //result.print();
        result2.print();

        //5.execute
        env.execute();
    }
}

 

4.6.2  Operator State

下圖對 word count 示例中的FromElementsFunction類進行詳解並分享如何在程式碼中使用 operator state:

 

 

l 官網程式碼示例

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state

需求:

使用ListState儲存offset模擬Kafkaoffset維護

編碼步驟:

//-1.宣告一個OperatorState來記錄offset

private ListState<Long> offsetState = null;

private Long offset = 0L;

//-2.建立狀態描述器

ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);

//-3.根據狀態描述器獲取State

offsetState = context.getOperatorStateStore().getListState(descriptor);

 

//-4.獲取State中的值

Iterator<Long> iterator = offsetState.get().iterator();

if (iterator.hasNext()) {//迭代器中有值

    offset = iterator.next();//取出的值就是offset

}

offset += 1L;

ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",當前的offset為:" + offset);

if (offset % 5 == 0) {//每隔5條訊息,模擬一個異常

//-5.儲存State到Checkpoint中

offsetState.clear();//清理記憶體中儲存的offset到Checkpoint中

//-6.將offset存入State中

offsetState.add(offset);

 

l 程式碼示例

package cn.itcast.state;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Desc
 * 需求:
 * 使用OperatorState支援的資料結構ListState儲存offset資訊, 模擬Kafkaoffset維護,
 * 其實就是FlinkKafkaConsumer底層對應offset的維護!
 */
public class StateDemo02_OperatorState {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //先直接使用下面的程式碼設定Checkpoint時間間隔和磁碟路徑以及程式碼遇到異常後的重啟策略,下午會學
        env.enableCheckpointing(1000);//每隔1s執行一次Checkpoint
        env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //固定延遲重啟策略: 程式出現異常的時候,重啟2次,每次延遲3秒鐘重啟,超過2次,程式退出
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

        //2.Source
        DataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());

        //3.Transformation
        //4.Sink
        sourceData.print();

        //5.execute
        env.execute();
    }

    /**
     * MyKafkaSource就是模擬的FlinkKafkaConsumer並維護offset
     */
    public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
        //-1.宣告一個OperatorState來記錄offset
        private ListState<Long> offsetState = null;
        private Long offset = 0L;
        private boolean flag = true;

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            //-2.建立狀態描述器
            ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
            //-3.根據狀態描述器初始化狀態
            offsetState = context.getOperatorStateStore().getListState(descriptor);
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            //-4.獲取並使用State中的值
            Iterator<Long> iterator = offsetState.get().iterator();
            if (iterator.hasNext()){
                offset = iterator.next();
            }
            while (flag){
                offset += 1;
                int id = getRuntimeContext().getIndexOfThisSubtask();
                ctx.collect("分割槽:"+id+"消費到的offset位置為:" + offset);//1 2 3 4 5 6
                //Thread.sleep(1000);
                TimeUnit.SECONDS.sleep(2);
                if(offset % 5 == 0){
                    System.out.println("程式遇到異常了.....");
                    throw new Exception("程式遇到異常了.....");
                }
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }

        /**
         * 下面的snapshotState方法會按照固定的時間間隔將State資訊儲存到Checkpoint/磁碟中,也就是在磁碟做快照!
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            //-5.儲存StateCheckpoint
            offsetState.clear();//清理記憶體中儲存的offsetCheckpoint
            //-6.offset存入State
            offsetState.add(offset);
        }
    }
}

 

5.  Flink-容錯機制

5.1  Checkpoint

5.1.1  State Vs Checkpoint

l State:

維護/儲存的是某一個Operator的執行的狀態/歷史值,是維護在記憶體中!

一般指一個具體的Operator的狀態(operator的狀態表示一些運算元在執行的過程中會產生的一些歷史結果,如前面的maxBy底層會維護當前的最大值,也就是會維護一個keyedOperator,這個State裡面存放就是maxBy這個Operator中的最大值)

State資料預設儲存在Java的堆記憶體中/TaskManage節點的記憶體中

State可以被記錄,在失敗的情況下資料還可以恢復

l Checkpoint:

某一時刻,Flink中所有的Operator的當前State的全域性快照,一般存在磁碟上

表示了一個Flink Job在一個特定時刻的一份全域性狀態快照,即包含了所有Operator的狀態

可以理解為Checkpoint是把State資料定時持久化儲存了

比如KafkaConsumer運算元中維護的Offset狀態,當任務重新恢復的時候可以從Checkpoint中獲取

 

 

注意:

Flink中的Checkpoint底層使用了Chandy-Lamport algorithm分散式快照演算法可以保證資料的在分散式環境下的一致性!

https://zhuanlan.zhihu.com/p/53482103

Chandy-Lamport algorithm演算法的作者也是ZK中Paxos 一致性演算法的作者

https://www.cnblogs.com/shenguanpu/p/4048660.html

Flink中使用Chandy-Lamport algorithm分散式快照演算法取得了成功,後續Spark的StructuredStreaming也借鑑了該演算法

 

5.1.2  Checkpoint執行流程

5.1.2.1  簡單流程

 

0.Flink的JobManager建立CheckpointCoordinator

1.Coordinator向所有的SourceOperator傳送Barrier柵欄(理解為執行Checkpoint的訊號)

2.SourceOperator接收到Barrier之後,暫停當前的操作(暫停的時間很短,因為後續的寫快照是非同步的),並製作State快照, 然後將自己的快照儲存到指定的介質中(如HDFS), 一切 ok之後向Coordinator彙報並將Barrier傳送給下游的其他Operator

3.其他的如TransformationOperator接收到Barrier,重複第2步,最後將Barrier傳送給Sink

4.Sink接收到Barrier之後重複第2步

5.Coordinator接收到所有的Operator的執行ok的彙報結果,認為本次快照執行成功

 

注意:

1.在往介質(如HDFS)中寫入快照資料的時候是非同步的(為了提高效率)

2.分散式快照執行時的資料一致性由Chandy-Lamport algorithm分散式快照演算法保證!

 

5.1.2.2  複雜流程--課後自行閱讀

下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 作業,最右側的是持久化儲存,在大部分使用者場景中對應 HDFS。

1.Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。

 

 

 

2.source 節點向下遊廣播 barrier,這個 barrier 就是實現 Chandy-Lamport 分散式快照演算法的核心,下游的 task 只有收到所有 input 的 barrier 才會執行相應的 Checkpoint。

 

 

 

3. task 完成 state 備份後,會將備份資料的地址(state handle)通知給 Checkpoint coordinator。

 

 

4.下游的 sink 節點收集齊上游兩個 input 的 barrier 之後,會執行本地快照,(柵欄對齊)

這裡還展示了 RocksDB incremental Checkpoint (增量Checkpoint)的流程,首先 RocksDB 會全量刷資料到磁碟上(紅色大三角表示),然後 Flink 框架會從中選擇沒有上傳的檔案進行持久化備份(紫色小三角)。

 

 

 

5.同樣的,sink 節點在完成自己的 Checkpoint 之後,會將 state handle 返回通知 Coordinator。

 

 

 

6.最後,當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全域性完成了,向持久化儲存中再備份一個 Checkpoint meta 檔案。

 

 

5.1.3  State狀態後端/State儲存介質

注意:

前面學習了Checkpoint其實就是Flink中某一時刻,所有的Operator的全域性快照,

那麼快照應該要有一個地方進行儲存,而這個儲存的地方叫做狀態後端

Flink中的State狀態後端有很多種:

5.1.3.1  MemStateBackend[瞭解]

 

第一種是記憶體儲存,即 MemoryStateBackend,構造方法是設定最大的StateSize,選擇是否做非同步快照,

對於State狀態儲存在 TaskManager 節點也就是執行節點記憶體中的,因為記憶體有容量限制,所以單個 State maxStateSize 預設 5 M,且需要注意 maxStateSize <= akka.framesize 預設 10 M。

對於Checkpoint 儲存在 JobManager 記憶體中,因此總大小不超過 JobManager 的記憶體。

推薦使用的場景為:本地測試、幾乎無狀態的作業,比如 ETL、JobManager 不容易掛,或掛掉影響不大的情況。

不推薦在生產場景使用。

 

5.1.3.2  FsStateBackend

 

另一種就是在檔案系統上的 FsStateBackend 構建方法是需要傳一個檔案路徑和是否非同步快照。

State 依然在 TaskManager 記憶體中,但不會像 MemoryStateBackend 是 5 M 的設定上限

Checkpoint 儲存在外部檔案系統(本地或 HDFS),打破了總大小 Jobmanager 記憶體的限制。

推薦使用的場景為:常規使用狀態的作業、例如分鐘級視窗聚合或 join、需要開啟HA的作業。

 

如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")),

如果使用本地檔案,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。

在分散式情況下,不推薦使用本地檔案。因為如果某個運算元在節點A上失敗,在節點B上恢復,使用本地檔案時,在B上無法讀取節點 A上的資料,導致狀態恢復失敗。

 

5.1.3.3  RocksDBStateBackend

 

還有一種儲存為 RocksDBStateBackend ,

RocksDB 是一個 key/value 的記憶體儲存系統,和其他的 key/value 一樣,先將狀態放到記憶體中,如果記憶體快滿時,則寫入到磁碟中,

但需要注意 RocksDB 不支援同步的 Checkpoint,構造方法中沒有同步快照這個選項。

不過 RocksDB 支援增量的 Checkpoint,意味著並不需要把所有 sst 檔案上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 檔案即可。它的 Checkpoint 儲存在外部檔案系統(本地或HDFS),

其容量限制只要單個 TaskManager 上 State 總量不超過它的記憶體+磁碟,單 Key最大 2G,總大小不超過配置的檔案系統容量即可。

推薦使用的場景為:超大狀態的作業,例如天級視窗聚合、需要開啟 HA 的作業、最好是對狀態讀寫效能要求不高的作業。

 

5.1.4  Checkpoint配置方式

5.1.4.1  全域性配置

修改flink-conf.yaml

#這裡可以配置

#jobmanager(即MemoryStateBackend),

#filesystem(即FsStateBackend),

#rocksdb(即RocksDBStateBackend)

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints

 

5.1.4.2  在程式碼中配置

//1.MemoryStateBackend--開發中不用

    env.setStateBackend(new MemoryStateBackend)

//2.FsStateBackend--開發中可以使用--適合一般狀態--秒級/分鐘級視窗...

    env.setStateBackend(new FsStateBackend("hdfs路徑或測試時的本地路徑"))

//3.RocksDBStateBackend--開發中可以使用--適合超大狀態--天級視窗...

env.setStateBackend(new RocksDBStateBackend(filebackend, true))

 

注意:RocksDBStateBackend還需要引入依賴

    <dependency>

       <groupId>org.apache.flink</groupId>

       <artifactId>flink-statebackend-rocksdb_2.11</artifactId>

       <version>1.7.2</version>

    </dependency>

 

5.1.5  程式碼演示

package cn.itcast.checkpoint;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Author itcast
 * Desc 演示Checkpoint引數設定(也就是Checkpoint執行流程中的步驟0相關的引數設定)
 */
public class CheckpointDemo01 {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //===========Checkpoint引數設定====
        //===========型別1:必須引數=============
        //設定Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!
        env.enableCheckpointing(1000);
        //設定State狀態儲存介質
        /*if(args.length > 0){
            env.setStateBackend(new FsStateBackend(args[0]));
        }else {
            env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
        }*/
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:\\data\\ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
        }
        //===========型別2:建議引數===========
        //設定兩個Checkpoint 之間最少等待時間,如設定Checkpoint之間最少是要等 500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和後一次重疊到一起去了)
        //:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//預設是0
        //設定如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true  false不是
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//預設是true
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//預設值為0,表示不容忍任何檢查點失敗
        //設定是否清理檢查點,表示 Cancel 時是否需要保留當前的 Checkpoint,預設 Checkpoint會在作業被Cancel時被刪除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATIONtrue,當作業被取消時,刪除外部的checkpoint(預設值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATIONfalse,當作業被取消時,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========型別3:直接使用預設的即可===============
        //設定checkpoint的執行模式為EXACTLY_ONCE(預設)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //設定checkpoint的超時時間,如果 Checkpoint 60s內尚未完成說明該次Checkpoint失敗,則丟棄。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//預設10分鐘
        //設定同一時間有多少個checkpoint可以同時執行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//預設為1

        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //3.1切割出每個單詞並直接記為1
        DataStream<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value就是每一行
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分組
        //注意:批處理的分組是groupBy,流處理的分組是keyBy
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);

        DataStream<String> result = (SingleOutputStreamOperator<String>) aggResult.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + ":::" + value.f1;
            }
        });

        //4.sink
        result.print();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        result.addSink(kafkaSink);

        //5.execute
        env.execute();

        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
}

 

 

 

5.2  狀態恢復和重啟策略

 

5.2.1  自動重啟策略和恢復

5.2.1.1  重啟策略配置方式

l 配置檔案中

flink-conf.yml中可以進行配置,示例如下:

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10 s

 

l 程式碼中

還可以在程式碼中針對該任務進行配置,示例如下:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

3, // 重啟次數

Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔

 ))

5.2.1.2  重啟策略分類
5.2.1.2.1  預設重啟策略

如果配置了Checkpoint,而沒有配置重啟策略,那麼程式碼中出現了非致命錯誤時,程式會無限重啟

5.2.1.2.2  無重啟策略

 Job直接失敗,不會嘗試進行重啟
 設定方式1:
 restart-strategy: none
 ​
 設定方式2:
 無重啟策略也可以在程式中設定
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.noRestart())

5.2.1.2.3  固定延遲重啟策略--開發中使用

 設定方式1:
 重啟策略可以配置flink-conf.yaml的下面配置引數來啟用,作為預設的重啟策略:
 例子:
 restart-strategy: fixed-delay
 restart-strategy.fixed-delay.attempts: 3
 restart-strategy.fixed-delay.delay: 10 s
 ​
 設定方式2:
 也可以在程式中設定:
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // 最多重啟3次數
   Time.of(10, TimeUnit.SECONDS) // 重啟時間間隔
 ))
 上面的設定表示:如果job失敗,重啟3次, 每次間隔10

5.2.1.2.4  失敗率重啟策略--開發偶爾使用

 設定方式1:
 失敗率重啟策略可以在flink-conf.yaml中設定下面的配置引數來啟用:
 例子:
 restart-strategy:failure-rate
 restart-strategy.failure-rate.max-failures-per-interval: 3
 restart-strategy.failure-rate.failure-rate-interval: 5 min
 restart-strategy.failure-rate.delay: 10 s
 ​
 設定方式2:
 失敗率重啟策略也可以在程式中設定:
 val env = ExecutionEnvironment.getExecutionEnvironment()
 env.setRestartStrategy(RestartStrategies.failureRateRestart(
   3, // 每個測量時間間隔最大失敗次數
   Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔
   Time.of(10, TimeUnit.SECONDS) // 兩次連續重啟的時間間隔
 ))
 上面的設定表示:如果5分鐘內job失敗不超過三次,自動重啟, 每次間隔10s (如果5分鐘內程式失敗超過3次,則程式退出)

 

 

5.2.1.3  程式碼演示

package cn.itcast.checkpoint;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * Author itcast
 * Desc 演示Checkpoint+重啟策略
 */
public class CheckpointDemo02_RestartStrategy {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //===========Checkpoint引數設定====
        //===========型別1:必須引數=============
        //設定Checkpoint的時間間隔為1000ms做一次Checkpoint/其實就是每隔1000ms發一次Barrier!
        env.enableCheckpointing(1000);
        //設定State狀態儲存介質
        /*if(args.length > 0){
            env.setStateBackend(new FsStateBackend(args[0]));
        }else {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        }*/
        if(SystemUtils.IS_OS_WINDOWS){
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        }else{
            env.setStateBackend(new FsStateBackend("hdfs://node1:8020/flink-checkpoint/checkpoint"));
        }
        //===========型別2:建議引數===========
        //設定兩個Checkpoint 之間最少等待時間,如設定Checkpoint之間最少是要等 500ms(為了避免每隔1000ms做一次Checkpoint的時候,前一次太慢和後一次重疊到一起去了)
        //:高速公路上,每隔1s關口放行一輛車,但是規定了兩車之前的最小車距為500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//預設是0
        //設定如果在做Checkpoint過程中出現錯誤,是否讓整體任務失敗:true  false不是
        //env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//預設是true
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//預設值為0,表示不容忍任何檢查點失敗
        //設定是否清理檢查點,表示 Cancel 時是否需要保留當前的 Checkpoint,預設 Checkpoint會在作業被Cancel時被刪除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATIONtrue,當作業被取消時,刪除外部的checkpoint(預設值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATIONfalse,當作業被取消時,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========型別3:直接使用預設的即可===============
        //設定checkpoint的執行模式為EXACTLY_ONCE(預設)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //設定checkpoint的超時時間,如果 Checkpoint 60s內尚未完成說明該次Checkpoint失敗,則丟棄。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//預設10分鐘
        //設定同一時間有多少個checkpoint可以同時執行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//預設為1

        //=============重啟策略===========
        //-1.預設策略:配置了Checkpoint而沒有配置重啟策略預設使用無限重啟
        //-2.配置無重啟策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        //-3.固定延遲重啟策略--開發中使用!
        //重啟3,每次間隔10s
        /*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, //嘗試重啟3
                Time.of(10, TimeUnit.SECONDS))//每次重啟間隔10s
        );*/
        //-4.失敗率重啟--偶爾使用
        //5分鐘內重啟3(3次不包括,也就是最多重啟2),每次間隔10s
        /*env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, // 每個測量時間間隔最大失敗次數
                Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔
                Time.of(10, TimeUnit.SECONDS) // 每次重啟的時間間隔
        ));*/

        //上面的能看懂就行,開發中使用下面的程式碼即可
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);

        //3.Transformation
        //3.1切割出每個單詞並直接記為1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //value就是每一行
                String[] words = value.split(" ");
                for (String word : words) {
                    if(word.equals("bug")){
                        System.out.println("手動模擬的bug...");
                        throw new RuntimeException("手動模擬的bug...");
                    }
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分組
        //注意:批處理的分組是groupBy,流處理的分組是keyBy
        KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);

        //4.sink
        result.print();

        //5.execute
        env.execute();
    }
}

 

5.2.2  手動重啟並恢復-瞭解

1.把程式打包

 

 

 

2.啟動Flink叢集(本地單機版,叢集版都可以)

/export/server/flink/bin/start-cluster.sh

 

3.訪問webUI

http://node1:8081/#/overview

http://node2:8081/#/overview

 

 

 

4.使用FlinkWebUI提交

cn.itcast.checkpoint.CheckpointDemo01

 

 

 

 

 

5.取消任務

 

 

 

 

 

6.重新啟動任務並指定從哪恢復

cn.itcast.checkpoint.CheckpointDemo01

hdfs://node1:8020/flink-checkpoint/checkpoint/9e8ce00dcd557dc03a678732f1552c3a/chk-34

 

 

 

 

 

 

7.關閉/取消任務

 

 

8.關閉叢集

/export/server/flink/bin/stop-cluster.sh

 

5.3  Savepoint

5.3.1  Savepoint介紹

Savepoint:儲存點,類似於以前玩遊戲的時候,遇到難關了/遇到boss了,趕緊手動存個檔,然後接著玩,如果失敗了,趕緊從上次的存檔中恢復,然後接著玩

 

在實際開發中,可能會遇到這樣的情況:如要對叢集進行停機維護/擴容...

那麼這時候需要執行一次Savepoint也就是執行一次手動的Checkpoint/也就是手動的發一個barrier柵欄,那麼這樣的話,程式的所有狀態都會被執行快照並儲存,

當維護/擴容完畢之後,可以從上一次Savepoint的目錄中進行恢復!

 

 

5.3.2  Savepoint VS Checkpoint

 

 

5.3.3  Savepoint演示

# 啟動yarn session

/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

 

# 執行job-會自動執行Checkpoint

/export/server/flink/bin/flink run --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar

 

# 手動建立savepoint--相當於手動做了一次Checkpoint

/export/server/flink/bin/flink savepoint 702b872ef80f08854c946a544f2ee1a5 hdfs://node1:8020/flink-checkpoint/savepoint/

 

# 停止job

/export/server/flink/bin/flink cancel 702b872ef80f08854c946a544f2ee1a5

 

# 重新啟動job,手動載入savepoint資料

/export/server/flink/bin/flink run -s hdfs://node1:8020/flink-checkpoint/savepoint/savepoint-702b87-0a11b997fa70 --class cn.itcast.checkpoint.CheckpointDemo01 /root/ckp.jar

 

# 停止yarn session

yarn application -kill application_1607782486484_0014

 

6.  擴充套件:關於並行度

一個Flink程式由多個Operator組成(source、transformation和 sink)。

 一個Operator由多個並行的Task(執行緒)來執行, 一個Operator的並行Task(執行緒)數目就被稱為該Operator(任務)的並行度(Parallel)

 

l 並行度可以有如下幾種指定方式

1.Operator Level(運算元級別)(可以使用)

一個運算元、資料來源和sink的並行度可以通過呼叫 setParallelism()方法來指定

 

 

2.Execution Environment Level(Env級別)(可以使用)

執行環境(任務)的預設並行度可以通過呼叫setParallelism()方法指定。為了以並行度3來執行所有的運算元、資料來源和data sink, 可以通過如下的方式設定執行環境的並行度:

執行環境的並行度可以通過顯式設定運算元的並行度而被重寫

 

 

3.Client Level(客戶端級別,推薦使用)(可以使用)

並行度可以在客戶端將job提交到Flink時設定。

對於CLI客戶端,可以通過-p引數指定並行度

./bin/flink run -p 10 WordCount-java.jar

 

4.System Level(系統預設級別,儘量不使用)

在系統級可以通過設定flink-conf.yaml檔案中的parallelism.default屬性來指定所有執行環境的預設並行度

 

l 示例

 

 

 

Example1

fink-conf.yaml中 taskmanager.numberOfTaskSlots 預設值為1,即每個Task Manager上只有一個Slot ,此處是3

Example1中,WordCount程式設定了並行度為1,意味著程式 Source、Reduce、Sink在一個Slot中,佔用一個Slot

Example2

通過設定並行度為2後,將佔用2個Slot

Example3

通過設定並行度為9,將佔用9個Slot

Example4

通過設定並行度為9,並且設定sink的並行度為1,則Source、Reduce將佔用9個Slot,但是Sink只佔用1個Slot

 

 

l 注意

1.並行度的優先順序:運算元級別 > env級別 > Client級別 > 系統預設級別  (越靠前具體的程式碼並行度的優先順序越高)

2.如果source不可以被並行執行,即使指定了並行度為多個,也不會生效

3.在實際生產中,我們推薦在運算元級別顯示指定各自的並行度,方便進行顯示和精確的資源控制。

4.slot是靜態的概念,是指taskmanager具有的併發執行能力; parallelism是動態的概念,是指程式執行時實際使用的併發能力