/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.game;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.LeaderBoard;
import org.apache.beam.examples.complete.game.UserScore;
import org.apache.beam.examples.complete.game.utils.GameConstants;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class StatefulTeamScore
extends LeaderBoard {
    private static @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized WriteToBigQuery.FieldInfo<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>>> configureCompleteWindowedTableWrite() {
        HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
        tableConfigure.put("team", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> ((KV)c.element()).getKey()));
        tableConfigure.put("total_score", new WriteToBigQuery.FieldInfo("INTEGER", (c, w) -> ((KV)c.element()).getValue()));
        tableConfigure.put("processing_time", new WriteToBigQuery.FieldInfo("STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print((ReadableInstant)Instant.now())));
        return tableConfigure;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Options options = (Options)PipelineOptionsFactory.fromArgs((String[])args).withValidation().as(Options.class);
        options.setStreaming(true);
        ExampleUtils exampleUtils = new ExampleUtils(options);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        ((PCollection)((PCollection)((PCollection)((PCollection)pipeline.apply((PTransform)PubsubIO.readStrings().withTimestampAttribute("timestamp_ms").fromTopic(options.getTopic()))).apply("ParseGameEvent", (PTransform)ParDo.of((DoFn)new UserScore.ParseEventFn()))).apply("MapTeamAsKey", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.kvs((TypeDescriptor)TypeDescriptors.strings(), (TypeDescriptor)TypeDescriptor.of(UserScore.GameActionInfo.class))).via((SerializableFunction & Serializable)gInfo -> KV.of((Object)gInfo.team, (Object)gInfo)))).apply("UpdateTeamScore", (PTransform)ParDo.of((DoFn)new UpdateTeamScoreFn(options.getThresholdScore())))).apply("WriteTeamLeaders", new WriteWindowedToBigQuery(((GcpOptions)options.as(GcpOptions.class)).getProject(), options.getDataset(), options.getLeaderBoardTableName() + "_team_leader", StatefulTeamScore.configureCompleteWindowedTableWrite()));
        PipelineResult result = pipeline.run();
        exampleUtils.waitToFinish(result);
    }

    @VisibleForTesting
    public static class UpdateTeamScoreFn
    extends DoFn<KV<String, UserScore.GameActionInfo>, KV<String, Integer>> {
        private static final @UnknownKeyFor @NonNull @Initialized String TOTAL_SCORE = "totalScore";
        private final @UnknownKeyFor @NonNull @Initialized int thresholdScore;
        @DoFn.StateId(value="totalScore")
        private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer>> totalScoreSpec = StateSpecs.value((Coder)VarIntCoder.of());

        public UpdateTeamScoreFn(@UnknownKeyFor @NonNull @Initialized int thresholdScore) {
            this.thresholdScore = thresholdScore;
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext c, @DoFn.StateId(value="totalScore") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer> totalScore) {
            String teamName = (String)((KV)c.element()).getKey();
            UserScore.GameActionInfo gInfo = (UserScore.GameActionInfo)((KV)c.element()).getValue();
            int oldTotalScore = (Integer)MoreObjects.firstNonNull((Object)((Integer)totalScore.read()), (Object)0);
            totalScore.write((Object)(oldTotalScore + gInfo.score));
            if (oldTotalScore / this.thresholdScore < (Integer)totalScore.read() / this.thresholdScore) {
                c.output((Object)KV.of((Object)teamName, (Object)((Integer)totalScore.read())));
            }
        }
    }

    public static interface Options
    extends LeaderBoard.Options {
        @Description(value="Numeric value, multiple of which is used as threshold for outputting team score.")
        @Default.Integer(value=5000)
        public @UnknownKeyFor @NonNull @Initialized Integer getThresholdScore();

        public void setThresholdScore(@UnknownKeyFor @NonNull @Initialized Integer var1);
    }
}

