1. 程式人生 > >用Python進行實時計算——PyFlink快速入門

用Python進行實時計算——PyFlink快速入門

![](https://user-gold-cdn.xitu.io/2020/6/24/172e3aa15e6c73fe?w=321&h=245&f=png&s=19355) Flink 1.9.0及更高版本支援Python,也就是PyFlink。 在最新版本的Flink 1.10中,PyFlink支援Python使用者定義的函式,使您能夠在Table API和SQL中註冊和使用這些函式。但是,聽完所有這些後,您可能仍然想知道PyFlink的架構到底是什麼?作為PyFlink的快速指南,本文將回答這些問題。 # 為什麼需要PyFlink? ## Python上的Flink和Flink上的Python 那麼,PyFlink到底是什麼?顧名思義,PyFlink就是Apache Flink與Python的組合,或者說是Python上的Flink。但是Flink on Python是什麼意思?首先,兩者的結合意味著您可以在Python中使用Flink的所有功能。而且,更重要的是,PyFlink還允許您在Flink上使用Python廣泛的生態系統的計算功能,從而可以進一步促進其生態系統的開發。換句話說,這對雙方都是雙贏。如果您更深入地研究這個主題,您會發現Flink框架和Python語言的整合絕不是巧合。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3aad180ed654?w=720&h=388&f=png&s=119665) ## Python和大資料生態系統 python語言與大資料緊密相連。為了理解這一點,我們可以看一下人們正在使用Python解決的一些實際問題。一項使用者調查顯示,大多數人都在使用Python進行資料分析和機器學習應用程式。對於此類情況,大資料空間中還解決了一些理想的解決方案。除了擴大大資料產品的受眾範圍之外,Python和大資料的整合還通過將其獨立體系結構擴充套件到分散式體系結構,極大地增強了Python生態系統的功能。這也解釋了在分析大量資料時對Python的強烈需求。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3aaed5ca098e?w=711&h=267&f=png&s=65091) ## 為什麼選擇Flink和Python? Python和大資料的整合與其他最近的趨勢一致。但是,再次說明一下,為什麼Flink現在支援Python,而不是Go或R或另一種語言?而且,為什麼大多數使用者選擇PyFlink而不是PySpark和PyHive? 為了理解原因,讓我們首先考慮使用Flink框架的一些優勢: - **有利的體系結構:** Flink是具有統一流和批處理功能的純流計算引擎。 - **新的活力:**根據ASF的客觀統計,Flink是2019年最活躍的開源專案。 - **高可靠性:**作為一個開源專案,Flink經過長期測試,並廣泛應用於大資料公司的生產環境中。 接下來,讓我們看看為什麼Flink支援Python而不是其他語言。統計資料顯示,Python是繼Java和C之後最受歡迎的語言,並且自2018年以來一直在快速發展。Java和Scala是Flink的預設語言,但是Flink支援Python似乎是合理的。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab098192049?w=743&h=271&f=png&s=123507) PyFlink是相關技術發展的必然產物。但是,僅僅瞭解PyFlink的重要性是不夠的,因為我們的最終目標是使Flink和Python使用者受益並解決實際問題。因此,我們需要進一步探索如何實現PyFlink。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab22b93acc0?w=731&h=250&f=png&s=48433) # PyFlink架構 要實現PyFlink,我們需要知道要實現的關鍵目標和要解決的核心問題。PyFlink的主要目標是什麼?簡而言之,PyFlink的主要目標如下: 1. 使所有Flink功能對Python使用者可用。 2. 在Flink上執行Python的分析和計算功能,以提高Python解決大資料問題的能力。 在此基礎上,讓我們分析實現這些目標需要解決的關鍵問題。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab40fff61c5?w=670&h=333&f=png&s=82135) ## 使Flink功能可供Python使用者使用 要實現PyFlink,是否需要像現有Java引擎一樣在Flink上開發Python引擎?答案是NO。嘗試在Flink 1.8版或更早版本中進行,但效果不佳。基本設計原則是以最小的成本實現給定的目標。最簡單但最好的方法是提供一層Python API,並重用現有的計算引擎。 那麼,我們應該為Flink提供哪些Python API?他們對我們很熟悉:高階表API和SQL,以及有狀態的DataStream API。現在,我們越來越接近Flink的內部邏輯,下一步是提供適用於Python的Table API和DataStream API。但是,剩下要解決的關鍵問題到底是什麼呢? ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab5c111d161?w=611&h=205&f=png&s=54535) **關鍵問題** 顯然,關鍵問題在於在Python虛擬機器(PyVM)和Java虛擬機器(JVM)之間建立握手,這對於Flink支援多種語言至關重要。要解決此問題,我們必須選擇適當的通訊技術。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab756ae1657?w=659&h=159&f=png&s=43459) **選擇虛擬機器通訊技術** 當前,有兩種解決方案可用於實現PyVM和JVM之間的通訊,它們是Beam和Py4J。前者是一個著名的專案,具有多語言和多引擎支援,而後者是用於PyVM和JVM之間通訊的專用解決方案。我們可以從幾個不同的角度比較和對比Apache Beam和Py4J,以瞭解它們之間的區別。首先,考慮一個比喻:要越過一堵牆,Py4J會像痣一樣在其中挖一個洞,而Apache Beam會像大熊一樣把整堵牆推倒。從這個角度來看,使用Apache Beam來實現VM通訊有點複雜。簡而言之,這是因為Apache Beam專注於通用性,在極端情況下缺乏靈活性。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ab8e5a924e1?w=687&h=274&f=png&s=90971) 除此之外,Flink還需要互動式程式設計。此外,為了使Flink正常工作,我們還需要確保其API設計中的語義一致性,尤其是在其多語言支援方面。Apache Beam的現有體系結構無法滿足這些要求,因此答案很明顯,Py4J是支援PyVM和JVM之間通訊的最佳選擇。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3aba4458f5c5?w=671&h=269&f=png&s=60974) **技術架構** 在PyVM和JVM之間建立通訊之後,我們已經實現了向Python使用者提供Flink功能的第一個目標。我們已經在Flink 1.9版中實現了這一點。現在,讓我們看一下Flink 1.9版中PyFlink API的體系結構: Flink 1.9版使用Py4J來實現虛擬機器通訊。我們為PyVM啟用了閘道器,為JVM啟用了閘道器伺服器以接收Python請求。此外,我們還提供了Python API中的TableENV和Table之類的物件,這些物件與Java API中提供的物件相同。因此,編寫Python API的本質是關於如何呼叫Java API。Flink 1.9版還解決了作業部署問題。它使您可以通過各種方式提交作業,例如執行Python命令以及使用Python Shell和CLI。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3abb9f0e250a?w=676&h=278&f=png&s=62190) 但是,此體系結構提供了哪些優勢?首先,該體系結構很簡單,並且可以確保Python API和Java API之間的語義一致性。其次,它還提供了與Java作業相當的出色Python作業處理效能。 ## 在Flink上執行Python的分析和計算功能 上一節介紹瞭如何使Flink功能可供Python使用者使用。本節說明如何在Flink上執行Python函式。通常,我們可以通過以下兩種方式之一在Flink上執行Python函式: 1. **選擇一個典型的Python類庫,並將其API新增到PyFlink。**該方法花費很長時間,因為Python包含太多的類庫。在合併任何API之前,我們需要簡化Python執行。 2. **基於現有的Flink Table API和Python類庫的特徵,我們可以將所有現有的Python類庫函式視為使用者定義的函式,並將其整合到Flink中。**Flink 1.10及更高版本中支援此功能。功能整合的關鍵問題是什麼?同樣,它取決於Python使用者定義函式的執行。 接下來,讓我們為這個關鍵問題選擇一種技術。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3abdb320fc80?w=661&h=238&f=png&s=92774) **選擇執行使用者定義功能的技術** 實際上,執行Python使用者定義的函式非常複雜。它不僅涉及虛擬機器之間的通訊,還涉及以下所有方面:管理Python執行環境,解析Java和Python之間交換的業務資料,將Flink中的狀態後端傳遞給Python以及監視執行狀態。鑑於所有這些複雜性,現在是Apache Beam發揮作用的時候了。作為支援多種引擎和多種語言的大熊,Apache Beam可以在解決這種情況方面做很多工作,所以讓我們看看Apache Beam如何處理執行Python使用者定義的函式。 下面顯示了可移植性框架,該框架是Apache Beam的高度抽象的體系結構,旨在支援多種語言和引擎。當前,Apache Beam支援幾種不同的語言,包括Java,Go和Python。 **使用者定義的功能架構** UDF體系結構不僅需要實現PyVM與JVM之間的通訊,還需要在編譯和執行階段滿足不同的要求。在下面的PyLink使用者定義功能架構圖中,JVM中的行為以綠色表示,而PyVM中的行為以藍色表示。讓我們看看編譯期間的區域性設計。本地設計依賴於純API對映呼叫。Py4J用於VM通訊。 現在,讓我們看看Python API和Java API在此架構中的工作方式。在Java方面,JobMaster將作業分配給TaskManager,就像處理普通Java作業一樣,並且TaskManager執行任務,這涉及到操作員在JVM和PyVM中的執行。在Python使用者定義的函式運算子中,我們將設計各種gRPC服務,用於JVM和PyVM之間的通訊。例如,用於業務資料通訊的DataService和用於Python UDF的StateService來呼叫Java State後端。還將提供許多其他服務,例如日誌記錄和指標。 # 我們如何使用PyFlink? 瞭解了PyFlink的體系結構及其背後的思想之後,我們來看一下PyFlink的特定應用場景,以更好地瞭解其背後的方式和原因。 ## PyFlink的應用場景 PyFlink支援哪些業務方案?我們可以從兩個角度分析其應用場景:Python和Java。請記住,PyFlink也適用於Java可以應用的所有情況。 1. **事件驅動的方案,**例如實時資料監控。 2. **資料分析,**例如庫存管理和資料視覺化。 3. **資料管道,**也稱為ETL方案,例如日誌解析。 4. **機器學習,**例如有針對性的建議。 您可以在所有這些情況下使用PyFlink。PyFlink也適用於特定於Python的方案,例如科學計算。在如此眾多的應用場景中,您可能想知道現在可以使用哪些特定的PyFlink API。因此,現在我們也來研究這個問題。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3abff9816886?w=725&h=336&f=png&s=142954) ## PyFlink安裝 在使用任何API之前,您需要安裝PyFlink。當前,要安裝PyFlink,請執行命令:`pip install apache-Flink` ## PyFlink API PyFlink API與Java Table API完全一致,以支援各種關係和視窗操作。某些易於使用的PyFlink API比SQL API更為強大,例如特定於列操作的API。除了API,PyFlink還提供了多種定義Python UDF的方法。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ac174c9ed83?w=668&h=365&f=png&s=163306) ## PyFlink中使用者定義的函式定義 可以擴充套件ScalarFunction(例如,通過新增指標)以提供更多輔助功能。另外,PyFlink使用者功能函式支援Python支援的所有方法定義,例如lambda,命名函式和可呼叫函式。 定義完這些方法後,我們可以使用PyFlink Decorators進行標記,並描述輸入和輸出資料型別。我們還可以基於Python的型別提示功能進一步簡化更高版本,以進行型別派生。以下示例將幫助您更好地瞭解如何定義使用者定義的函式。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ac2e9972fda?w=642&h=262&f=png&s=85762) ## 定義Python使用者定義函式的一種情況 在本例中,我們將兩個數字相加。首先,為此,匯入必要的類,然後定義前面提到的函式。這非常簡單,因此讓我們進行一個實際案例。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ac453b72062?w=1129&h=497&f=png&s=306404) # PyFlink的未來前景如何? 通常,使用PyFlink進行業務開發很簡單。您可以通過SQL或Table API輕鬆描述業務邏輯,而無需瞭解基礎實現。讓我們看一下PyFlink的整體前景。 ## 目標驅動路線圖 PyFlink的開發始終受到目標的推動,這些目標是使Flink功能可供Python使用者使用並將Python函式整合到Flink中。根據下面顯示的PyFlink路線圖,我們首先在PyVM和JVM之間建立了通訊。然後,在Flink 1.9中,我們提供了Python Table API,向Python使用者開放了現有的Flink Table API功能。在Flink 1.10中,我們準備通過以下操作將Python函式整合到Flink:整合Apache Beam,設定Python使用者定義的函式執行環境,管理Python對其他類庫的依賴關係以及為使用者定義使用者定義的函式API,以便支援Python使用者定義函式。 為了擴充套件分散式Python的功能,PyFlink提供了對[Pandas Series](https://pandas.pydata.org/)和[DataFrame](https://pandas.pydata.org/pandas-docs/stable/getting_started/dsintro.html)支援,以便使用者可以在PyFlink中直接使用Pandas使用者定義的函式。此外,將來會在SQL客戶端上啟用Python使用者定義函式,以使PyFlink易於使用。PyFlink還將提供Python ML管道API,以使Python使用者能夠在機器學習中使用PyFlink。監視Python使用者定義的函式執行對實際生產和業務至關重要。因此,PyFlink將進一步為Python使用者定義函式提供度量管理。這些功能將包含在Flink 1.11中。 但是,這些只是PyFlink未來發展計劃的一部分。還有更多工作要做,例如優化PyFlink的效能,提供圖形計算API以及為Flink上的Pandas支援Pandas的本機API。我們將繼續向Python使用者提供Flink的現有功能,並將Python的強大功能整合到Flink中,以實現擴充套件Python生態系統的最初目標。 ![](https://user-gold-cdn.xitu.io/2020/6/24/172e3ac644cbd6c5?w=656&h=377&f=png&s=50523) PyFlink的前景如何?您可能知道,PyFlink是Apache Flink的一部分,它涉及執行時和API層。 PyFlink在這兩層將如何發展?在執行時方面,PyFlink將構建用於JVM和PyVM之間通訊的gRPC常規服務(例如控制元件,資料和狀態)。在此框架中,將抽象化Java Python使用者定義函式運算子,並構建Python執行容器以支援Python的多種執行方式。例如,PyFlink可以在Docker容器中甚至在外部服務叢集中作為程序執行。特別是在外部服務群集中執行時,將以套接字的形式啟用無限擴充套件功能。這一切在後續的Python整合中都起著至關重要的作用。 在API方面,我們將在Flink中啟用基於Python的API,以實現我們的使命。這也依賴於Py4J VM通訊框架。PyFlink將逐漸支援更多的API,包括Flink中的Java API(例如Python Table API,UDX,ML Pipeline,DataStream,CEP,Gelly和State API)以及在Python使用者中最受歡迎的Pandas API。基於這些API,PyFlink將繼續與其他生態系統整合以便於開發;例如Notebook,Zeppelin,Jupyter和Alink,這是阿里巴巴的Flink開源版本。到目前為止,PyAlink已完全整合了PyFlink的功能。PyFlink也將與現有的AI系統平臺整合,例如著名的TensorFlow。 為此,PyFlink將一直保持活力。同樣,PyFlink的任務是使Flink功能可供Python使用者使用,並在Flink上執行Python分析和計算功能。 更多實時資料分析相關博文與科技資訊,歡迎關注 “實時流式計算” 關注 “實時流式計算” 回覆 “電子書” 獲取Flink 300頁實戰電子書 ![](https://user-gold-cdn.xitu.io/2020/6/22/172d968252276fa6?w=908&h=341&f=png&s