/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.HourlyTeamScore;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.complete.game.utils.GameConstants;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class LeaderBoard
extends HourlyTeamScore {
    static final Duration FIVE_MINUTES = Duration.standardMinutes((long)5L);
    static final Duration TEN_MINUTES = Duration.standardMinutes((long)10L);

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureWindowedTableWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("team", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> (String)((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (c, w) -> (Integer)((KV)c.element()).getValue()));
        tableConfigure.put("window_start", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> {
            IntervalWindow window = (IntervalWindow)w;
            return GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)window.start());
        }));
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)Instant.now())));
        tableConfigure.put("timing", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> c.pane().getTiming().toString()));
        return tableConfigure;
    }

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureBigQueryWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("user", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> (String)((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (c, w) -> (Integer)((KV)c.element()).getValue()));
        return tableConfigure;
    }

    protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> configureGlobalWindowBigQueryWrite() {
        Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = LeaderBoard.configureBigQueryWrite();
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)Instant.now())));
        return tableConfigure;
    }

    public static void main(String[] args) throws Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        ExampleUtils exampleUtils = new ExampleUtils(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection gameEvents = (PCollection)((PCollection)pipeline.apply((PTransform)PubsubIO.readStrings().withTimestampAttribute("timestamp_ms").fromTopic(options.getTopic()))).apply("ParseGameEvent", (PTransform)ParDo.of((DoFn)new UserScore.ParseEventFn()));
        ((PCollection)gameEvents.apply("CalculateTeamScores", (PTransform)new CalculateTeamScores(Duration.standardMinutes((long)options.getTeamWindowDuration().intValue()), Duration.standardMinutes((long)options.getAllowedLateness().intValue())))).apply("WriteTeamScoreSums", new WriteWindowedToBigQuery(((GcpOptions)options.as(GcpOptions.class)).getProject(), options.getDataset(), options.getLeaderBoardTableName() + "_team", LeaderBoard.configureWindowedTableWrite()));
        ((PCollection)gameEvents.apply("CalculateUserScores", (PTransform)new CalculateUserScores(Duration.standardMinutes((long)options.getAllowedLateness().intValue())))).apply("WriteUserScoreSums", new WriteToBigQuery(((GcpOptions)options.as(GcpOptions.class)).getProject(), options.getDataset(), options.getLeaderBoardTableName() + "_user", LeaderBoard.configureGlobalWindowBigQueryWrite()));
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    @VisibleForTesting
    static class CalculateUserScores
    extends PTransform<PCollection<UserScore.GameActionInfo>, PCollection<KV<String, Integer>>> {
        private final Duration allowedLateness;

        CalculateUserScores(Duration allowedLateness) {
            this.allowedLateness = allowedLateness;
        }

        public PCollection<KV<String, Integer>> expand(PCollection<UserScore.GameActionInfo> input) {
            return (PCollection)((PCollection)input.apply("LeaderboardUserGlobalWindow", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).accumulatingFiredPanes().withAllowedLateness(this.allowedLateness))).apply("ExtractUserScore", (PTransform)new UserScore.ExtractAndSumScore("user"));
        }
    }

    @VisibleForTesting
    static class CalculateTeamScores
    extends PTransform<PCollection<UserScore.GameActionInfo>, PCollection<KV<String, Integer>>> {
        private final Duration teamWindowDuration;
        private final Duration allowedLateness;

        CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
            this.teamWindowDuration = teamWindowDuration;
            this.allowedLateness = allowedLateness;
        }

        public PCollection<KV<String, Integer>> expand(PCollection<UserScore.GameActionInfo> infos) {
            return (PCollection)((PCollection)infos.apply("LeaderboardTeamFixedWindows", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)this.teamWindowDuration)).triggering((Trigger)AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)).withLateFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).withAllowedLateness(this.allowedLateness).accumulatingFiredPanes())).apply("ExtractTeamScore", (PTransform)new UserScore.ExtractAndSumScore("team"));
        }
    }

    public static interface Options
    extends ExampleOptions,
    StreamingOptions {
        @Description(value="BigQuery Dataset to write tables to. Must already exist.")
        @Validation.Required
        public String getDataset();

        public void setDataset(String var1);

        @Description(value="Pub/Sub topic to read from")
        @Validation.Required
        public String getTopic();

        public void setTopic(String var1);

        @Description(value="Numeric value of fixed window duration for team analysis, in minutes")
        @Default.Integer(value=60)
        public Integer getTeamWindowDuration();

        public void setTeamWindowDuration(Integer var1);

        @Description(value="Numeric value of allowed data lateness, in minutes")
        @Default.Integer(value=120)
        public Integer getAllowedLateness();

        public void setAllowedLateness(Integer var1);

        @Description(value="Prefix used for the BigQuery table names")
        @Default.String(value="leaderboard")
        public String getLeaderBoardTableName();

        public void setLeaderBoardTableName(String var1);
    }
}

