1. 程式人生 > 其它 >JS構建圖——資料結構

JS構建圖——資料結構

宣告:本系列部落格為原創,最先發表在拉勾教育,其中一部分為免費閱讀部分。被讀者各種搬運至各大網站。所有其他的來源均為抄襲。

 

Flink入門程式WordCount 和 SQL實現

一、WordCount

1、首先建立好專案,然後新增相關依賴

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <!--<scope>provided</scope>-->
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <!--<scope>provided</scope>-->
</dependency>

2、DataSet WordCount

      wordcount程式是大資料處理框架的入門程式,統計一段檔案每個單詞出現次數。該程式主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是將單詞進行分組計數不能給列印輸出結果。
      整體程式碼如下:

    public static void main(String[] args) throws Exception {

      // 建立Flink執行的上下文環境
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 建立DataSet,這裡我們的輸入是一行一行的文字
      DataSet<String> text = env.fromElements(
            "Flink Spark Storm",
            "Flink Flink Flink",
            "Spark Spark Spark",
            "Storm Storm Storm"
      );
      // 通過Flink內建的轉換函式進行計算
      DataSet<Tuple2<String, Integer>> counts =
            text.flatMap(new LineSplitter())
                  .groupBy(0)
                  .sum(1);
      //結果列印
      counts.printToErr();

   }

   public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

      @Override
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // 將文字分割
         String[] tokens = value.toLowerCase().split("\\W+");

         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<String, Integer>(token, 1));
            }
         }
      }
    }

      實現的整個過程中分為一下幾個步驟。
(1)我們需要建立Flink的上下文執行環境:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

(2)使用fromElements函式建立一個DataSet物件,該物件中包含了我們的輸入,使用FlatMap、GroupBy、Sum函式進行轉換
(3)直接執行解僱

3、DataStream WordCount

      為了模仿一個流式計算環境,我們選擇監聽一個本地的socket埠,並且使用Flink中的滾動視窗,每5s列印一次計算結果,程式碼如下:

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // 建立Flink的流式計算環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 監聽本地9000埠
        DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");

        // 將接收的資料進行拆分,分組,視窗計算並且進行聚合輸出
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // 列印結果
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

整個流式計算的過程分為以下幾步:
(1)首先建立一個流式計算環境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

(2)進行本地9000埠監聽,將接受的資料進行拆分、分組、視窗計算並且進行聚合輸出,程式碼中使用了Flink的視窗函式,後面會進行詳解。
(3)在本地使用netcat命令啟動一個視窗:

nc -lk 9000

(4)執行程式,得到結果

輸入:

$ nc -lk 9000
Flink Flink Flink 
Flink Spark Storm

結果:

Flink : 4
Spark : 1
Storm : 1

4、Flink Table & SQL WordCount

      Flink SQL 是Flink實時計算為簡化計算模型,降低使用者使用實時計算的門檻而設計的一套符合標準SQL語義的開發語言。
      一個完整的Flink SQL便攜的程式包括以下三個部分:

  • Source Operator:是對外部資料來源的抽象,目前Flink內建了很多常用的資料來源實現,如MySQL、Kafka等
  • Transformation Operator:運算元操作主要完成比如查詢、聚合操作等。目前Flink SQL支援Union、join、Projection、Difference、intersection及window等操作
  • Sink Operator:是對外結果表的抽象,內建了比如MySQL、Kafka等

(1)首先在pom中增加依賴

<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-java</artifactId>
         <version>1.10.0</version>
     </dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_2.11
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-java-bridge_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner-blink_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
<dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner_2.11</artifactId>
         <version>1.10.0</version>
</dependency>
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
         <version>1.10.0</version>
</dependency>

(2)建立上下文環境

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

(3)讀取一行資料作為輸入

String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();

for(String word : split){
    WC wc = new WC(word,1);
    list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);

(4)註冊成表,執行SQL,然後輸出

//DataSet 轉sql, 指定欄位名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();

//註冊為一個表
fbTableEnv.createTemporaryView("WordCount", table);

Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");

//將錶轉換DataSet
DataSet<WC> ds3  = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();

整體程式碼如下:

public class WordCountSQL {

    public static void main(String[] args) throws Exception{

        //獲取執行環境
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        //建立一個tableEnvironment
        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

        String words = "hello flink hello lagou";

        String[] split = words.split("\\W+");
        ArrayList<WC> list = new ArrayList<>();

        for(String word : split){
            WC wc = new WC(word,1);
            list.add(wc);
        }
        DataSet<WC> input = fbEnv.fromCollection(list);

        //DataSet 轉sql, 指定欄位名
        Table table = fbTableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();

        //註冊為一個表
        fbTableEnv.createTemporaryView("WordCount", table);

        Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");

        //將錶轉換DataSet
        DataSet<WC> ds3  = fbTableEnv.toDataSet(table02, WC.class);
        ds3.printToErr();
    }

    public static class WC {
        public String word;
        public long frequency;

        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return  word + ", " + frequency;
        }
    }
}

5、總結

這篇文章主要以wordcount場景用Flink來演示,讓大家體驗Flink SQL的強大之處,為後續內容打好基礎。