1. 程式人生 > >Apache Beam WordCount程式設計實戰及原始碼解讀

Apache Beam WordCount程式設計實戰及原始碼解讀

概述:Apache Beam WordCount程式設計實戰及原始碼解讀,並通過intellij IDEA和terminal兩種方式除錯執行WordCount程式,Apache Beam對大資料的批處理和流處理,提供一套先進的統一的程式設計模型,並可以執行大資料處理引擎上。完整專案Github原始碼

負責公司大資料處理相關架構,但是具有多樣性,極大的增加了開發成本,急需統一程式設計處理,Apache Beam,一處程式設計,處處執行,故將折騰成果分享出來。

1.Apache Beam程式設計實戰–前言,Apache Beam的特點與關鍵概念。
Apache Beam 於2017年1月10日成為Apache新的頂級專案。

1.1.Apache Beam 特點:
統一:對於批處理和流媒體用例使用單個程式設計模型。
方便:支援多個pipelines環境執行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
可擴充套件:編寫和分享新的SDKs,IO聯結器和transformation庫 
部分翻譯摘自官網:Apacher Beam 官網
1.2.Apache Beam關鍵概念:
1.2.1.Apache Beam SDKs
主要是開發API,為批處理和流處理提供統一的程式設計模型。目前(2017)支援JAVA語言,而Python正在緊張開發中。

1.2.2. Apache Beam Pipeline Runners(Beam的執行器/執行者們),支援Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多個大資料計算框架。可謂是一處Apache Beam程式設計,多計算框架執行。
1.2.3. 他們的對如下的支援情況詳見


2.Apache Beam程式設計實戰–Apache Beam原始碼解讀
基於maven,intellij IDEA,pom.xm檢視 完整專案Github原始碼 。直接通過IDEA的專案匯入功能即可匯入完整專案,等待MAVEN下載依賴包,然後按照如下解讀步驟即可順利執行。

2.1.原始碼解析-Apache Beam 資料流處理原理解析:
關鍵步驟:

建立Pipeline
將轉換應用於Pipeline
讀取輸入檔案
應用ParDo轉換
應用SDK提供的轉換(例如:Count)
寫出輸出
執行Pipeline


2.2.原始碼解析,完整專案Github原始碼,附WordCount,pom.xml等
/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-20.
 * Project:ApacheBeamWordCount.
 */


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public class WordCount {

    /**
     *1.a.通過Dofn程式設計Pipeline使得程式碼很簡潔。b.對輸入的文字做單詞劃分,輸出。
     */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 將文字行劃分為單詞
            String[] words = c.element().split("[^a-zA-Z']+");
            // 輸出PCollection中的單詞
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /**
     *2.格式化輸入的文字資料,將轉換單詞為並計數的列印字串。
     */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }
    /**
     *3.單詞計數,PTransform(PCollection Transform)將PCollection的文字行轉換成格式化的可計數單詞。
     */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 將文字行轉換成單個單詞
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 計算每個單詞次數
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /**
     *4.可以自定義一些選項(Options),比如檔案輸入輸出路徑
     */
    public interface WordCountOptions extends PipelineOptions {

        /**
         * 檔案輸入選項,可以通過命令列傳入路徑引數,路徑預設為gs://apache-beam-samples/shakespeare/kinglear.txt
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();
        void setInputFile(String value);

        /**
         * 設定結果檔案輸出路徑,在intellij IDEA的執行設定選項中或者在命令列中指定輸出檔案路徑,如./pom.xml
         */
        @Description("Path of the file to write to")
        @Required
        String getOutput();
        void setOutput(String value);
    }
    /**
     * 5.執行程式
     */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
3.支援Spark,Flink,Apex等大資料資料框架來執行該WordCount程式。完整專案Github原始碼(推薦,注意pom.xml模組載入是否成功,在工具中開發大資料程式,利於除錯,開發體驗較好)
3.1.intellij IDEA(社群版)中Spark大資料框架執行Pipeline計算程式
Spark執行

設定VM options

-DPspark-runner
1
設定Programe arguments

--inputFile=pom.xml --output=counts
1


3.2.intellij IDEA(社群版)中Apex,Flink等支援的大資料框架均可執行WordCount的Pipeline計算程式,完整專案Github原始碼
Apex執行

設定VM options

-DPapex-runner
1
設定Programe arguments

--inputFile=pom.xml --output=counts
1
Flink執行等等

設定VM options

-DPflink-runner
1
設定Programe arguments

--inputFile=pom.xml --output=counts
1
4.終端執行(Terminal)(不推薦,第一次下載過程很慢,開發體驗較差)
4.1.以下命令是下載官方示例原始碼,第一次執行下載較慢,如果失敗了就多執行幾次,(推薦下載,完整專案Github原始碼)直接用上述解讀在intellij IDEA中執行。
mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false
1
2


4.2.打包並執行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
1


4.3.成功執行結果
4.3.1.顯示執行成功


4.3.2.WordCount輸出計算結果

-----------------