1. 程式人生 > >《Flink官方文件》Python 程式設計指南測試版

《Flink官方文件》Python 程式設計指南測試版

原文連結  譯者:hjjxd    校對:清英

Flink中的分析程式實現了對資料集的某些操作 (例如,資料過濾,對映,合併,分組)。這些資料最初來源於特定的資料來源(例如來自於讀檔案或資料集合)。操作執行的結果通過資料池以寫入資料到(分散式)檔案系統或標準輸出(例如命令列終端)的形式返回。Flink程式可以執行在不同的環境中,既能夠獨立執行,也可以嵌入到其他程式中執行。程式可以執行在本地的JVM上,也可以執行在伺服器叢集中。

為了建立你自己的Flink程式,我們鼓勵你從program skeleton(程式框架)開始,並逐漸增加你自己的transformations(變化)。以下是更多的用法和高階特性的索引。

示例程式

以下程式是一段完整可執行的WordCount示例程式。你可以複製貼上這些程式碼並在本地執行。

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count +=
sum([x[0] for x in iterator]) collector.collect((count, word)) env = get_environment() data = env.from_elements("Who's there?", "I think I hear them. Stand, ho! Who's there?") data \ .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \ .group_by(1) \ .reduce_group
(Adder(), combinable=True) \ .output() env.execute(local=True)

程式框架

從示例程式可以看出,Flink程式看起來就像普通的python程式一樣。每個程式都包含相同的基本組成部分:

1.獲取一個執行環境
2.載入/建立初始資料
3.指定對這些資料的操作
4.指定計算結果的存放位置
5.執行程式

接下來,我們將對每個步驟給出概述,更多細節可以參考與之對應的小節。
Environment(執行環境)是所有Flink程式的基礎。你可以通過呼叫Environment類中的一些靜態方法來建立一個環境:

get_environment()

執行環境可通過多種讀檔案的方式來指定資料來源。如果是簡單的按行讀取文字檔案,你可以採用:

env = get_environment()
text = env.read_text("file:///path/to/file")

這樣,你就獲得了可以進行操作(apply transformations)的資料集。關於資料來源和輸入格式的更多資訊,請參考 Data Sources
一旦你獲得了一個數據集DataSet,你就可以通過transformations來建立一個新的資料集,並把它寫入到檔案,再次transform,或者與其他資料集相結合。你可以通過對資料集呼叫自己個性化定製的函式來進行資料操作。例如,一個類似這樣的資料對映操作:

data.map(lambda x: x*2)

這將會建立一個新的資料集,其中的每個資料都是原來資料集中的2倍。若要獲取關於所有transformations的更多資訊,及所有資料操作的列表,請參考Transformations

當你需要將所獲得的資料集寫入到磁碟時,呼叫下面三種函式的其中一個即可。

data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()

其中,最後一種方法僅適用於在本機上進行開發/除錯,它會將資料集的內容輸出到標準輸出。(請注意,當函式在叢集上執行時,結果將會輸出到整個叢集節點的標準輸出流,即輸出到workers的.out檔案。)前兩種方法,能夠將資料集寫入到對應的檔案中。關於寫入到檔案的更多資訊,請參考Data Sinks

當你設計好了程式之後,你需要在環境中執行execute命令來執行程式。可以選擇在本機執行,也可以提交到叢集執行,這取決於Flink的建立方式。你可以通過設定execute(local=True)強制程式在本機執行。

建立專案

除了搭建好Flink執行環境,就無需進行其他準備工作了。Python包可以從你的Flink版本對應的/resource資料夾找到。在執行工作任務時,Flink 包,plan包和optional包均可以通過HDFS自動分發。

Python API已經在安裝了Python2.7或3.4的Linux/Windows系統上測試過。

預設情況下,Flink通過呼叫”python”或”python3″來啟動python程序,這取決於使用了哪種啟動指令碼。通過在 flink-conf.yaml 中設定 “python.binary.python[2/3]”對應的值,來設定你所需要的啟動方式。

延遲(惰性)求值

所有的Flink程式都是延遲執行的。當程式的主函式執行時,資料的載入和操作並沒有在當時發生。與此相反,每一個被創建出來的操作都被加入到程式的計劃中。當程式環境中的某個物件呼叫了execute()函式時,這些操作才會被真正的執行。不論該程式是在本地執行還是叢集上執行。

延遲求值能夠讓你建立複雜的程式,並在Flink上以一個整體的計劃單元來執行。

資料變換

資料變換(Data transformations)可以將一個或多個數據集對映為一個新的資料集。程式能夠將多種變換結合到一起來進行復雜的整合變換。

該小節將概述各種可以實現的資料變換。transformations documentation資料變換文件中,有關於所有資料變換和示例的全面介紹。

Transformation  Description    變換描述
Map 輸入一個元素,輸出一個元素
data.map(lambda x: x * 2)
FlatMap 輸入一個元素,輸出0,1,或多個元素
data.flat_map(
  lambda x,c: [(1,word) for word in line.lower().split() for line 
in x])
MapPartition 通過一次函式呼叫實現並行的分割操作。該函式將分割變換作為一個”迭代器”,並且能夠產生任意數量的輸出值。每次分割變換的元素數量取決於變換的並行性和之前的操作結果。
data.map_partition(lambda x,c: [value * 2 for value in x])
Filter 對每一個元素,計算一個布林表示式的值,保留函式計算結果為true的元素。
data.filter(lambda x: x > 1000)
Reduce 通過不斷的將兩個元素組合為一個,來將一組元素結合為一個單一的元素。這種縮減變換可以應用於整個資料集,也可以應用於已分組的資料集。
data.reduce(lambda x,y : x + y)
ReduceGroup 將一組元素縮減為1個或多個元素。縮減分組變換可以被應用於一個完整的資料集,或者一個分組資料集。
lass Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator)      
    collector.collect((count, word))

data.reduce_group(Adder())
Aggregate 對一個數據集包含所有元組的一個域,或者資料集的每個資料組,執行某項built-in操作(求和,求最小值,求最大值)。聚集變換可以被應用於一個完整的資料集,或者一個分組資料集。
# This code finds the sum of all of the values in the first field
 and the maximum of all of the values in the second field
data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)

# min(), max(), and sum() syntactic sugar functions are also available
data.sum(0).and_agg(Aggregation.Max, 1)
Join 對兩個資料集進行聯合變換,將得到一個新的資料集,其中包含在兩個資料集中擁有相等關鍵字的所有元素對。也可通過JoinFunction來把成對的元素變為單獨的元素。關於join keys的更多資訊請檢視 keys 。
# In this case tuple fields are used as keys.
# "0" is the join field on the first tuple
# "1" is the join field on the second tuple.
result = input1.join(input2).where(0).equal_to(1)
CoGroup 是Reduce變換在二維空間的一個變體。將來自一個或多個域的資料加入資料組。變換函式transformation function將被每一對資料組呼叫。關於定義coGroup keys的更多資訊,請檢視 keys 。
data1.co_group(data2).where(0).equal_to(1)
Cross 計算兩個輸入資料集的笛卡爾乘積(向量叉乘),得到所有元素對。也可通過CrossFunction實現將一對元素轉變為一個單獨的元素。
result = data1.cross(data2)
Union 將兩個資料集進行合併。
data.union(data2)
ZipWithIndex 為資料組中的元素逐個分配連續的索引。瞭解更多資訊,請參考 [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).
data.zip_with_index()

指定Keys

一些變換(例如Join和CoGroup),需要在進行變換前,為作為輸入引數的資料集指定一個關鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對資料集根據某個關鍵字進行分組。

資料集可通過如下方式分組

reduced = data \
  .group_by(<define key here>) \
  .reduce_group(<do something>)

Flink中的資料模型並不是基於鍵-值對。你無需將資料集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實資料之上,引導分組操作的函式。

為元組定義keys

最簡單的情形是對一個數據集中的元組按照一個或多個域進行分組:

reduced = data \
  .group_by(0) \
  .reduce_group(<do something>)

資料集中的元組被按照第一個域分組。對於接下來的group-reduce函式,輸入的資料組中,每個元組的第一個域都有相同的值。

grouped = data \
  .group_by(0,1) \
  .reduce(/*do something*/)

在上面的例子中,資料集的分組基於第一個和第二個域形成的複合關鍵字,因此,reduce函式輸入資料組中,每個元組兩個域的值均相同。
關於巢狀元組需要注意:如果你有一個使用了巢狀元組的資料集,指定group_by(<index of tuple>)操作,系統將把整個元組作為關鍵字使用。

向Flink傳遞函式

一些特定的操作需要採用使用者自定義的函式,因此它們都接受lambda表示式和rich functions作為輸入引數。

data.filter(lambda x: x > 5)
class Filter(FilterFunction):
    def filter(self, value):
        return value > 5

data.filter(Filter())

Rich functions可以將函式作為輸入引數,允許使用broadcast-variables(廣播變數),能夠由init()函式引數化,是複雜函式的一個可考慮的實現方式。它們也是在reduce操作中,定義一個可選的combine function的唯一方式。
Lambda表示式可以讓函式在一行程式碼上實現,非常便捷。需要注意的是,如果某個操作會返回多個數值,則其使用的lambda表示式應當返回一個迭代器。(所有函式將接收一個collector輸入 引數)。

資料型別

Flink的Python API目前僅支援python中的基本資料型別(int,float,bool,string)以及byte arrays。
執行環境對資料型別的支援,包括序列化器serializer,反序列化器deserializer,以及自定義型別的類。

class MyObj(object):
    def __init__(self, i):
        self.value = i


class MySerializer(object):
    def serialize(self, value):
        return struct.pack(">i", value.value)


class MyDeserializer(object):
    def _deserialize(self, read):
        i = struct.unpack(">i", read(4))[0]
        return MyObj(i)


env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

Tuples/Lists

你可以使用元組(或列表)來表示複雜型別。Python中的元組可以轉換為Flink中的Tuple型別,它們包含數量固定的不同型別的域(最多25個)。每個域的元組可以是基本資料型別,也可以是其他的元組型別,從而形成巢狀元組型別。

word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1])

當進行一些要求指定關鍵字的操作時,例如對資料記錄進行分組或配對。通過設定關鍵字,可以非常便捷地指定元組中各個域的位置。你可以指定多個位置,從而實現複合關鍵字(更多資訊,查閱Section Data Transformations)。

wordCounts \
    .group_by(0) \
    .reduce(MyReduceFunction())

資料來源

資料來源建立了初始的資料集,包括來自檔案,以及來自資料介面/集合兩種方式。

基於檔案的:

  • read_text(path) – 按行讀取檔案,並將每一行以String形式返回。
  • read_csv(path,type) – 解析以逗號(或其他字元)劃分資料域的檔案。
    返回一個包含若干元組的資料集。支援基本的java資料型別作為欄位型別。

基於資料集合的:

  • from_elements(*args) – 基於一系列資料建立一個數據集,包含所有元素。
  • generate_sequence(from, to) – 按照指定的間隔,生成一系列資料。

Examples

env  = get_environment

\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)

資料池

資料池可以接收資料集,並被用來儲存或返回它們:

  • write_text() – 按行以String形式寫入資料。可通過對每個資料項呼叫str()函式獲取String。
  • write_csv(…) – 將元組寫入逗號分隔數值檔案。行數和資料欄位均可配置。每個欄位的值可通過對資料項呼叫str()方法得到。
  • output() – 在標準輸出上列印每個資料項的str()字串。

一個數據集可以同時作為多個操作的輸入資料。程式可以在寫入或列印一個數據集的同時,對其進行其他的變換操作。

Examples

標準資料池相關方法示例如下:

write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")

 write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")

廣播變數

使用廣播變數,能夠在使用普通輸入引數的基礎上,使得一個數據集同時被多個並行的操作所使用。這對於實現輔助資料集,或者是基於資料的引數化法非常有用。這樣,資料集就可以以集合的形式被訪問。

  • 註冊廣播變數:廣播資料集可通過呼叫with_broadcast_set(DataSet,String)函式,按照名字註冊廣播變數。
  • 訪問廣播變數:通過對呼叫self.context.get_broadcast_variable(String)可獲取廣播變數。
class MapperBcv(MapFunction):
    def map(self, value):
        factor = self.context.get_broadcast_variable("bcv")[0][0]
        return value * factor

# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")

# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

確保在進行廣播變數的註冊和訪問時,應當採用相同的名字(示例中的”bcv”)。
注意:由於廣播變數的內容被儲存在每個節點的內部儲存中,不適合包含過多內容。一些簡單的引數,例如標量值,可簡單地通過引數化rich function來實現。

並行執行

該章節將描述如何在Flink中配置程式的並行執行。一個Flink程式可以包含多個任務(操作,資料來源和資料池)。一個任務可以被劃分為多個可並行執行的部分,每個部分處理輸入資料的一個子集。並行執行的例項數量被稱作它的並行性或並行度degree of parallelism (DOP)。
在Flink中可以為任務指定不同等級的並行度。

執行環境級

Flink程式可在一個執行環境execution environment的上下文中執行。一個執行環境為其中執行的所有操作,資料來源和資料池定義了一個預設的並行度。執行環境的並行度可通過對某個操作的並行度進行配置來修改。
一個執行環境的並行度可通過呼叫set_parallelism()方法來指定。例如,為了將WordCount示例程式中的所有操作,資料來源和資料池的並行度設定為3,可以通過如下方式設定執行環境的預設並行度。

env = get_environment()
env.set_parallelism(3)

text.flat_map(lambda x,c: x.lower().split()) \
    .group_by(1) \
    .reduce_group(Adder(), combinable=True) \
    .output()

env.execute()

系統級

通過設定位於./conf/flink-conf.yaml.檔案的parallelism.default屬性,改變系統級的預設並行度,可設定所有執行環境的預設並行度。具體細節可查閱Configuration文件。

執行方法

為了在Flink中執行計劃任務,到Flink目錄下,執行/bin資料夾下的pyflink.sh指令碼。對於python2.7版本,執行pyflink2.sh;對於python3.4版本,執行pyflink3.sh。包含計劃任務的指令碼應當作為第一個輸入引數,其後可新增一些另外的python包,最後,在“-”之後,輸入其他附加引數。

./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]