1. 程式人生 > >通過Apache Beam官方例項User Score實驗流式join

通過Apache Beam官方例項User Score實驗流式join


不建議閱讀,太亂了

使用UserScore例子中的Input.UnboundedGenerator()

不使用window的時候,無界資料是不能和有界資料join的。

Join需要使用相同的視窗。不能一個是globalWindow一個是fixedWindow

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
	at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
	at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
	at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
	at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:126)
	at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
	at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107)
	at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59)
	at mobileGame.solution.StreamJoin.main(StreamJoin.java:238)

兩個流進行join,可行。

通過Input.UnboundedGenerator()和Input.BoundedGenerator()實現有界資料和無界資料的join,都使用FixedWindows.of(WINDOW_SIZE)),發現:結果有界檔案只join了第一個無界源視窗中的資料

程式碼:

package mobileGame.solution;

import mobileGame.GameActionInfo;
import mobileGame.utils.ExerciseOptions;
import mobileGame.utils.Input;
import mobileGame.utils.Output;
import myTest.JoinTest;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.joinlibrary.Join;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

public class StreamJoin {

    public static int num=1;
    // Extract user/score pairs from the event stream using processing time, via
    // global windowing.
    public static class UserLeaderBoard
            extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

        private Duration allowedLateness;
        private Duration updateFrequency;

        public UserLeaderBoard(Duration allowedLateness, Duration updateFrequency) {
            this.allowedLateness = allowedLateness;
            this.updateFrequency = updateFrequency;
        }

        @Override
        public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
            // [START EXERCISE 4 Part 1 - User Leaderboard]
            // JavaDoc: https://cloud.google.com/dataflow/java-sdk/JavaDoc
            // Developer Docs: https://cloud.google.com/dataflow/model/triggering
            //
            // Compute the user scores since the beginning of time. To do this we will
            // need:
            //
            // 1. Use the GlobalWindows WindowFn so that all events occur in the same
            // window.
            // 2. Specify the accumulation mode so that total score is constantly
            // increasing.
            // 3. Trigger every updateFrequency duration
            //
            return input
                    .apply(Window
                            // Since we want a globally increasing sum, use the GlobalWindows
                            // WindowFn
                            .<GameActionInfo>into(new GlobalWindows())
                            // We want periodic results every updateFrequency of processing
                            // time. We will be triggering repeatedly and forever, starting
                            // updateFrequency after the first element seen. Window.
                            .triggering(Repeatedly.forever(
                                    AfterProcessingTime
                                            .pastFirstElementInPane()
                                            .plusDelayOf(updateFrequency)))
                            // Specify the accumulation mode to ensure that each firing of the
                            // trigger produces monotonically increasing sums rather than just
                            // deltas.
                            .accumulatingFiredPanes())

                    // Extract and sum username/score pairs from the event data.
                    // You can use the ExtractAndSumScore transform again.
                    // Name the step -- look at overloads of apply().
                    .apply("ExtractUserScore", new ExtractAndSumScore(GameActionInfo.KeyField.USER));
            // [END EXERCISE 4 Part 1]
        }
    }

    // Extract user/score pairs from the event stream using processing time, via
    // global windowing.
    public static class TeamLeaderBoard
            extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

        private Duration allowedLateness;
        private Duration earlyUpdateFrequency;
        private Duration lateUpdateFrequency;
        private Duration windowSize;

        public TeamLeaderBoard(Duration allowedLateness, Duration earlyUpdateFrequency, Duration lateUpdateFrequency,
                               Duration windowSize) {
            this.allowedLateness = allowedLateness;
            this.earlyUpdateFrequency = earlyUpdateFrequency;
            this.lateUpdateFrequency = lateUpdateFrequency;
            this.windowSize = windowSize;
        }

        @Override
        public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
            // [START EXERCISE 4 Part 2 - Team Leaderboard]
            // JavaDoc: https://cloud.google.com/dataflow/java-sdk/JavaDoc
            // Developer Docs: https://cloud.google.com/dataflow/model/triggering
            //
            // We're going to produce windowed team score again, but this time we want
            // to get
            // early (speculative) results as well as occasional late updates.
            System.out.println("TeamLeaderBoard expand");
            return input
                    .apply("FixedWindows",
                            Window.<GameActionInfo>into(FixedWindows.of(windowSize))
                                    .triggering(AfterWatermark.pastEndOfWindow()
                                            // Specify .withEarlyFirings to produce speculative
                                            // results
                                            // with a delay of earlyUpdateFrequency
                                            .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(earlyUpdateFrequency))
                                            // Specify .withLateFirings to produce late updates with a
                                            // delay
                                            // of lateUpdateFrequency
                                            .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(lateUpdateFrequency)))
                                    // Specify allowed lateness, and ensure that we get cumulative
                                    // results
                                    // across the window.
                                    .withAllowedLateness(allowedLateness).accumulatingFiredPanes())
                    // Extract and sum teamname/score pairs from the event data.
                    // You can use the ExtractAndSumScore transform again.
                    .apply("ExtractTeamScore", new ExtractAndSumScore(GameActionInfo.KeyField.TEAM));
            // [END EXERCISE 4 Part 2]
        }
    }

    /**
     * A transform to extract key/score information from GameActionInfo, and sum
     * the scores. The constructor arg determines whether 'team' or 'user' info is
     * extracted.
     */
    private static class ExtractAndSumScore
            extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

        private final GameActionInfo.KeyField field;

        ExtractAndSumScore(GameActionInfo.KeyField field) {
            this.field = field;
            System.out.println("ExtractAndSumScore field");
        }

        @Override
        public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> gameInfo) {
            System.out.println("ExtractAndSumScore expand");
            return gameInfo
                    .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                            .via((GameActionInfo gInfo) -> KV.of(field.extract(gInfo), gInfo.getScore())))
                    .apply(Sum.<String>integersPerKey());
        }
    }

    //用於輸出到檔案
    public static class FormatAsTextFn extends SimpleFunction<KV<String,Integer>, String> {
        @Override
        public String apply(KV<String,Integer> input) {
            return "key:"+input.getKey()+"   value:"+input.getValue();
        }
    }

    //用於輸出Pcollection中的字串
    static class printStringInteger extends DoFn<KV<String ,Integer> ,KV<String ,Integer>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("c.element  StringInteger:" + c.element());
            c.output(c.element());
        }
    }

    static class printStringString extends DoFn<KV<String ,String> ,KV<String ,String>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("c.element:" + c.element());
            c.output(c.element());
        }
    }
    static class printStringIntegerString extends DoFn<KV<String,KV<Integer,String>> ,KV<String,KV<Integer,String>>> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("joined.element:" + c.element());
            c.output(c.element());
        }
    }
    static class printStringIntegerInteger extends DoFn<KV<String,KV<Integer,Integer>> ,KV<String,KV<Integer,Integer>>> {
        public static int sum=0;
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("joined.StringIntegerInteger.element:" + c.element() +"sum:"+sum);
            sum++;
            c.output(c.element());
        }
    }

    //預處理,即將每行為 a,b De 的資料轉化為KV<a,b>
    public static class Preprocess extends PTransform<PCollection<GameActionInfo>,PCollection<KV<String,Integer>>>{
        @Override
        public PCollection<KV<String,Integer>> expand(PCollection<GameActionInfo> gameActionInfoPCollection){
            //String[] temps = lines.toString().split(",");
            PCollection<KV<String,Integer>> result = gameActionInfoPCollection.apply(ParDo.of(new DoFn<GameActionInfo, KV<String,Integer>>() {
                @ProcessElement
                public void processElement(ProcessContext c){
                    String name = c.element().getUser();
                    Integer score = c. element().getScore();
                    KV<String ,Integer> kv=KV.of(name,score);
                    c.output(kv);
                }
            }));
            return result;
        }
    }
    private static final Duration ALLOWED_LATENESS = Duration.standardMinutes(30);
    private static final Duration EARLY_UPDATE_FREQUENCY = Duration.standardSeconds(10);
    private static final Duration LATE_UPDATE_FREQUENCY = Duration.standardSeconds(20);
    private static final Duration WINDOW_SIZE = Duration.standardSeconds(20);

    public static void main(String[] args) throws Exception {

        ExerciseOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ExerciseOptions.class);
        options.setStreaming(false);
        Pipeline pipeline = Pipeline.create(options);

        // Read game events from the unbounded injector.
        //如果不加入window是不能執行的

        PCollection<GameActionInfo> gameEvents = pipeline.apply(new Input.UnboundedGenerator())
                .apply(Window.<GameActionInfo>into(FixedWindows.of(WINDOW_SIZE)));

        //得到了流的PCollection<KV<String,Integer>>
        PCollection<KV<String,Integer>> stream = gameEvents.apply(new Preprocess());


        PCollection<GameActionInfo> gameEvents2 = pipeline.apply(new Input.BoundedGenerator())
                .apply(Window.<GameActionInfo>into(FixedWindows.of(WINDOW_SIZE)));

        //得到了流的PCollection<KV<String,Integer>>
        PCollection<KV<String,Integer>> stream2 = gameEvents2.apply(new Preprocess());

        //Beam裡頭沒有CountWindow麼
        PCollection<String> lines2=pipeline.apply(TextIO.read().from("/home/maqy/桌面/output/streamJoin"));

        PCollection<KV<String,String>> batch=lines2.apply(new PTransform<PCollection<String>, PCollection<KV<String, String>>>() {
            @Override
            public PCollection<KV<String, String>> expand(PCollection<String> input) {
                PCollection<KV<String,String>> result = input.apply(ParDo.of(new DoFn<String, KV<String, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c){
                        String[] s =c.element().split("\\W+");
                        if(s.length==2){
                            KV<String,String> kv =KV.of(s[0],s[1]);
                            c.output(kv);
                        }
                    }
                }));
                return result;
            }
        });

        stream2.apply(ParDo.of(new printStringInteger()));
        //batch.apply(ParDo.of(new printStringString()));

        //stream.apply(ParDo.of(new printStringInteger()));
        //不允許  無界資料和有界資料進行join
        PCollection<KV<String,KV<Integer,Integer>>> joinedPcollection = Join.innerJoin(stream,stream2);

        joinedPcollection.apply(ParDo.of(new printStringIntegerInteger()));
        //PCollection<KV<String,Integer>> result =results.apply(ParDo.of(new printString()));

        pipeline.run();
    }
}