將Flink中的批處理的WordCount轉化為流處理的WordCount
阿新 • • 發佈:2019-01-01
將Flink中的批處理的WordCount轉化為流處理的WordCount
目的:將Flink中批處理的WordCount轉化為流處理的WordCount
作用:感覺毫無用處
如何實現:將批的environmentBatch中的各個運算元,在流的environmentStream中重寫一遍
程式碼如下:
package org.apache.flink.examples.java.maqy; /** * 實現Flink中Batch的WordCount到流的WordCount的轉換 * 注意:流的WordCount相同的邏輯,每到來一個新元素都會進行一次輸出,所以輸出結果會不同 * * Flink版本:1.4.2 * @author maqy * @date 2018.08.11 */ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.AggregationFunction; import org.apache.flink.api.java.aggregation.SumAggregationFunction; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.operators.*; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.List; public class BatchToStream { public static void main(String[] args) throws Exception { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // get input data // DataSet<String> text = env.fromElements( // "To be, or not to be,--that is the question:--", // "Whether 'tis nobler in the mind to suffer", // "The slings and arrows of outrageous fortune", // "Or to take arms against a sea of troubles," // ); //輸入檔案 DataSet<String> a = env.readTextFile("F:\\test.txt"); DataSet<Tuple2<String, Integer>> b = a.flatMap(new LineSplitter()); //DataSet<Tuple2<String, Integer>> d = b.sum(1); DataSet<Tuple2<String, Integer>> c = b.groupBy(0) .sum(1); //sink必須單獨寫????,放在上一行後頭會報錯,原因是因為返回的是datasink型別 c.writeAsText("F:\\output\\batchToStream"); // DataSet<Tuple2<String, Integer>> counts = env.readTextFile("/home/maqy/桌面/out/test") // // split up the lines in pairs (2-tuples) containing: (word,1) // .flatMap(new LineSplitter()) // // group by the tuple field "0" and sum up tuple field "1" // .groupBy(0) // .sum(1); // // // execute and print result // counts.writeAsText("/home/maqy/桌面/out/out1"); StreamExecutionEnvironment envStream = batchToStream(env); //執行程式的是流的Environment //env.execute("batch job~~~~~~~~~~~~~~"); envStream.execute("StreamJob~~~~~~~~~~~~~"); } //實現批的環境到流的環境的轉換,傳入envBatch,返回StreamExecutionEnvironment public static StreamExecutionEnvironment batchToStream(ExecutionEnvironment envBatch) throws Exception { //建立一個新的流環境,用於返回的 StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment(); //設定並行度只能在這裡設定,不然沒用 envStream.setParallelism(1); //這裡考慮下用DataSet 還是用 Environment,得到環境中的sinks List<DataSink<?>> batchSinks = envBatch.getSinks(); for (DataSink dataSink : batchSinks) { //先定義一個數據流 DataStream first = null; //對每個sink進行操作,找到源頭? DataSet dataSetLast = dataSink.getDataSet(); //Operator繼承了DataSet DataSet p = dataSetLast; //不這麼寫,first會為null,初步判斷是因為first在datasource時建立的時候,重新定向到新的地址了 first = preVisit(p, envStream, first); //轉換sink OutputFormat dataSinkOutputFormat = dataSink.getFormat(); if (dataSinkOutputFormat instanceof TextOutputFormat) { System.out.println("dataSinkOutputFormat is a TextOutputFormat"); Path path = ((TextOutputFormat) dataSinkOutputFormat).getOutputFilePath(); first.writeUsingOutputFormat(new TextOutputFormat(path)); } //first.addSink(dataSink.) } //System.out.println("size:"+batchSinks.size()); //返回流環境,後期還可以考慮是否可以合併到原本存在的流環境中,甚至將各個datastream也加以返回,重新利用 return envStream; } //從尾向前遍歷,並轉化 public static DataStream preVisit(DataSet dataSet, StreamExecutionEnvironment envStream, DataStream first) { if (!(dataSet instanceof DataSource)) { // && (dataSet != null) //如果沒有到DataSource節點,則遞迴 first = preVisit(getPre(dataSet), envStream, first); } //對節點進行相應的操作 if (dataSet == null) { System.out.println("source is null"); } else if (dataSet instanceof DataSource) { //得到源頭後,看源屬於哪一種型別,然後新增到流中 //而且從dataSource可以得到輸出的資料型別 //這裡可以得到輸入的資料的型別,但還不知道怎麼用到DataStream中 // TypeInformation sourceTypeInfo = ((DataSource) dataSet).getResultType(); // Class sourceType = sourceTypeInfo.getTypeClass(); //System.out.println("sourceType:"+sourceTypeInfo.getTypeClass()); InputFormat inputFormat = ((DataSource) dataSet).getInputFormat(); if (inputFormat instanceof TextInputFormat) { //後期可以考慮是否可以直接轉換運算元 System.out.println("inputFormat is TextInputFormat"); String filePath = ((TextInputFormat) inputFormat).getFilePath().toString(); System.out.println("輸入的檔案路徑為:" + filePath); //這裡相當於讓first重新指向一個新地址了???? first = envStream.readTextFile(filePath); } } else if (dataSet instanceof SingleInputOperator) { System.out.println("SingleInputOperator yes"); //如果是SingleInputOperator,再判斷具體型別,SingleInputOperator中有DataSet 型別的 input。 //((SingleInputOperator) dataSet).getInput(); if (dataSet instanceof AggregateOperator) { System.out.println("AggregateOperator yes"); //可以有多個aggregationFunctions,還有個對應的List<Integer> fields,預設好像是4 List<AggregationFunction<?>> aggregationFunctions = ((AggregateOperator) dataSet).getAggregationFunctions(); List<Integer> fields = ((AggregateOperator) dataSet).getFields(); //首先要得到是否被groupBy過了,即是否可以得到UnsortedGrouping型別,得不到則是null Grouping grouping=((AggregateOperator) dataSet).getGrouping(); //如果grouping不是null的話,則說明經過了groupBy,則進行相應的轉換 if(grouping != null){ int position = 0; //暫時只考慮一個的情況 if(grouping instanceof SortedGrouping){ System.out.println("SortedGrouping yes"); }else if(grouping instanceof UnsortedGrouping){ System.out.println("UnsortedGrouping yes"); //Keys中有keyFields和originalKeyTypes,這裡的後者是String Keys keys=grouping.getKeys(); if(keys instanceof Keys.ExpressionKeys){ System.out.println("Keys.ExpressionKeys yes"); //這裡還沒弄清楚有多個時的意思 int numOfKeyFields = keys.getNumberOfKeyFields(); int[] positions = keys.computeLogicalKeyPositions(); if(numOfKeyFields == 1){ position = positions[0]; } //這裡還方便了我,在流中不用考慮UnsortedGrouping這種東西 first = first.keyBy(position); }else if(keys instanceof Keys.SelectorFunctionKeys){ System.out.println("Keys.SelectorFunctionKeys yes"); } } } //先考慮數目為1的情況,因為需要先keyby再sum if(aggregationFunctions.size()==1 && fields.size()==1){ if(aggregationFunctions.get(0) instanceof SumAggregationFunction){ if(first instanceof KeyedStream){ first = ((KeyedStream) first).sum(fields.get(0)); }else{ System.out.println("Stream中sum的話一定要keyby麼,似乎是。。。"); } } } //AggregateOperator aggregateOperator = (AggregateOperator) dataSet; } else if (dataSet instanceof SingleInputUdfOperator) { System.out.println("SingleInputUdfOperator yes"); if (dataSet instanceof FlatMapOperator){ System.out.println("FlatMapOperator yes"); FlatMapFunction flatMapFunction= ((FlatMapOperator) dataSet).getFlatMapFunction(); first = first.flatMap(flatMapFunction); } } else { System.out.println("Not sure what SingleInputOperator"); } } else if (dataSet instanceof TwoInputOperator) { System.out.println("TwoInputOperator yes"); } else { System.out.println("not sure what Operator"); } return first; } //得到一個前驅 public static DataSet getPre(DataSet dataSet) { if (dataSet instanceof Operator) { System.out.println("Operator yes"); if (dataSet instanceof DataSource) { System.out.println("DataSource yes"); return (DataSource) dataSet; } else if (dataSet instanceof SingleInputOperator) { System.out.println("SingleInputOperator yes"); //如果是SingleInputOperator,再判斷具體型別,SingleInputOperator中有DataSet 型別的 input。 return ((SingleInputOperator) dataSet).getInput(); } else if (dataSet instanceof TwoInputOperator) { System.out.println("TwoInputOperator yes"); ///// } else { System.out.println("not sure what Operator"); ///// } } else { System.out.println("no Operator"); ///// } return null; } // // User Functions // /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). */ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1 )); } } } } }
輸入文字為:
a b c d a a b
a a a
輸出(可以看到每來一個新單詞都會進行一次輸出):
(a,1)
(b,1)
(c,1)
(d,1)
(a,2)
(a,3)
(b,2)
(a,4)
(a,5)
(a,6)
正常的批處理的輸出:
(a,6)
(b,2)
(c,1)
(d,1)