/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;

class MultiStepCombine<K, InputT, AccumT, OutputT>
extends PTransformTranslation.RawPTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
    private final Coder<KV<K, OutputT>> outputCoder;
    static final String DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN = "urn:beam:directrunner:transforms:merge_accumulators_extract_output:v1";

    public static PTransformMatcher matcher() {
        return new PTransformMatcher(){

            public boolean matches(AppliedPTransform<?, ?, ?> application) {
                if (PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
                    CombineFnBase.GlobalCombineFn fn = ((Combine.PerKey)application.getTransform()).getFn();
                    return this.isApplicable(application.getInputs(), fn);
                }
                return false;
            }

            private <K, InputT> boolean isApplicable(Map<TupleTag<?>, PValue> inputs, CombineFnBase.GlobalCombineFn<InputT, ?, ?> fn) {
                if (!(fn instanceof Combine.CombineFn)) {
                    return false;
                }
                if (inputs.size() == 1) {
                    boolean accumulatorCoderAvailable;
                    PCollection input = (PCollection)Iterables.getOnlyElement(inputs.values());
                    WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                    boolean windowFnApplicable = windowingStrategy.getWindowFn().isNonMerging();
                    boolean triggerApplicable = DefaultTrigger.of().equals((Object)windowingStrategy.getTrigger());
                    try {
                        if (input.getCoder() instanceof KvCoder) {
                            KvCoder kvCoder = (KvCoder)input.getCoder();
                            Coder accumulatorCoder = fn.getAccumulatorCoder(input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
                            accumulatorCoderAvailable = accumulatorCoder != null;
                        } else {
                            accumulatorCoderAvailable = false;
                        }
                    }
                    catch (CannotProvideCoderException e) {
                        throw new RuntimeException(String.format("Could not construct an accumulator %s for %s. Accumulator %s for a %s may be null, but may not throw an exception", Coder.class.getSimpleName(), fn, Coder.class.getSimpleName(), Combine.class.getSimpleName()), e);
                    }
                    return windowFnApplicable && triggerApplicable && accumulatorCoderAvailable;
                }
                return false;
            }
        };
    }

    public static <K, InputT, AccumT, OutputT> MultiStepCombine<K, InputT, AccumT, OutputT> of(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> outputCoder) {
        return new MultiStepCombine<K, InputT, AccumT, OutputT>(combineFn, outputCoder);
    }

    private MultiStepCombine(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> outputCoder) {
        this.combineFn = combineFn;
        this.outputCoder = outputCoder;
    }

    @Override
    @Nonnull
    public String getUrn() {
        return "urn:beam:directrunner:transforms:multistepcombine:v1";
    }

    @Override
    @Nullable
    public RunnerApi.FunctionSpec getSpec() {
        return null;
    }

    @Override
    public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
        Coder accumulatorCoder;
        Preconditions.checkArgument((boolean)(input.getCoder() instanceof KvCoder), (String)"Expected input to have a %s of type %s, got %s", (Object)Coder.class.getSimpleName(), (Object)KvCoder.class.getSimpleName(), (Object)input.getCoder());
        KvCoder inputCoder = (KvCoder)input.getCoder();
        Coder inputValueCoder = inputCoder.getValueCoder();
        try {
            accumulatorCoder = this.combineFn.getAccumulatorCoder(input.getPipeline().getCoderRegistry(), inputValueCoder);
        }
        catch (CannotProvideCoderException e) {
            throw new IllegalStateException(String.format("Could not construct an Accumulator Coder with the provided %s %s", Combine.CombineFn.class.getSimpleName(), this.combineFn), e);
        }
        return (PCollection)((PCollection)((PCollection)input.apply((PTransform)ParDo.of(new CombineInputs(this.combineFn, input.getWindowingStrategy().getTimestampCombiner(), inputCoder.getKeyCoder())))).setCoder((Coder)KvCoder.of((Coder)inputCoder.getKeyCoder(), (Coder)accumulatorCoder)).apply((PTransform)GroupByKey.create())).apply(new MergeAndExtractAccumulatorOutput(this.combineFn, this.outputCoder));
    }

    private static class MergeAccumulatorsAndExtractOutputEvaluator<K, AccumT, OutputT>
    implements TransformEvaluator<KV<K, Iterable<AccumT>>> {
        private final AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> application;
        private final Combine.CombineFn<?, AccumT, OutputT> combineFn;
        private final UncommittedBundle<KV<K, OutputT>> output;

        public MergeAccumulatorsAndExtractOutputEvaluator(EvaluationContext ctxt, AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> application) {
            this.application = application;
            this.combineFn = ((MergeAndExtractAccumulatorOutput)application.getTransform()).getCombineFn();
            this.output = ctxt.createBundle((PCollection)Iterables.getOnlyElement(application.getOutputs().values()));
        }

        @Override
        public void processElement(WindowedValue<KV<K, Iterable<AccumT>>> element) throws Exception {
            Preconditions.checkState((element.getWindows().size() == 1 ? 1 : 0) != 0, (String)"Expected inputs to %s to be in exactly one window. Got %s", (Object)MergeAccumulatorsAndExtractOutputEvaluator.class.getSimpleName(), (int)element.getWindows().size());
            Iterable inputAccumulators = (Iterable)((KV)element.getValue()).getValue();
            try {
                Object first = this.combineFn.createAccumulator();
                Object merged = this.combineFn.mergeAccumulators(Iterables.concat(Collections.singleton(first), (Iterable)inputAccumulators, Collections.singleton(this.combineFn.createAccumulator())));
                Object extracted = this.combineFn.extractOutput(merged);
                this.output.add(element.withValue((Object)KV.of((Object)((KV)element.getValue()).getKey(), (Object)extracted)));
            }
            catch (Exception e) {
                throw UserCodeException.wrap((Throwable)e);
            }
        }

        @Override
        public TransformResult<KV<K, Iterable<AccumT>>> finishBundle() throws Exception {
            return StepTransformResult.withoutHold(this.application).addOutput(this.output, new UncommittedBundle[0]).build();
        }
    }

    static class MergeAndExtractAccumulatorOutputEvaluatorFactory
    implements TransformEvaluatorFactory {
        private final EvaluationContext ctxt;

        public MergeAndExtractAccumulatorOutputEvaluatorFactory(EvaluationContext ctxt) {
            this.ctxt = ctxt;
        }

        @Override
        @Nullable
        public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
            return this.createEvaluator(application, inputBundle);
        }

        private <K, AccumT, OutputT> TransformEvaluator<KV<K, Iterable<AccumT>>> createEvaluator(AppliedPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>, MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>> application, CommittedBundle<KV<K, Iterable<AccumT>>> inputBundle) {
            return new MergeAccumulatorsAndExtractOutputEvaluator<K, AccumT, OutputT>(this.ctxt, application);
        }

        @Override
        public void cleanup() throws Exception {
        }
    }

    static class MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>
    extends PTransformTranslation.RawPTransform<PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>> {
        private final Combine.CombineFn<?, AccumT, OutputT> combineFn;
        private final Coder<KV<K, OutputT>> outputCoder;

        private MergeAndExtractAccumulatorOutput(Combine.CombineFn<?, AccumT, OutputT> combineFn, Coder<KV<K, OutputT>> outputCoder) {
            this.combineFn = combineFn;
            this.outputCoder = outputCoder;
        }

        Combine.CombineFn<?, AccumT, OutputT> getCombineFn() {
            return this.combineFn;
        }

        @Override
        public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, Iterable<AccumT>>> input) {
            return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)input.getWindowingStrategy(), (PCollection.IsBounded)input.isBounded(), this.outputCoder);
        }

        @Override
        @Nonnull
        public String getUrn() {
            return MultiStepCombine.DIRECT_MERGE_ACCUMULATORS_EXTRACT_OUTPUT_URN;
        }

        @Override
        @Nullable
        public RunnerApi.FunctionSpec getSpec() {
            return null;
        }
    }

    static class WindowedStructuralKey<K> {
        private final StructuralKey<K> key;
        private final BoundedWindow window;

        public static <K> WindowedStructuralKey<K> create(Coder<K> keyCoder, K key, BoundedWindow window) {
            return new WindowedStructuralKey<K>(StructuralKey.of(key, keyCoder), window);
        }

        private WindowedStructuralKey(StructuralKey<K> key, BoundedWindow window) {
            this.key = (StructuralKey)Preconditions.checkNotNull(key, (Object)"key cannot be null");
            this.window = (BoundedWindow)Preconditions.checkNotNull((Object)window, (Object)"Window cannot be null");
        }

        public K getKey() {
            return this.key.getKey();
        }

        public BoundedWindow getWindow() {
            return this.window;
        }

        public boolean equals(Object other) {
            if (!(other instanceof WindowedStructuralKey)) {
                return false;
            }
            WindowedStructuralKey that = (WindowedStructuralKey)other;
            return this.window.equals(that.window) && this.key.equals(that.key);
        }

        public int hashCode() {
            return Objects.hash(this.window, this.key);
        }
    }

    private static class CombineInputs<K, InputT, AccumT>
    extends DoFn<KV<K, InputT>, KV<K, AccumT>> {
        private final Combine.CombineFn<InputT, AccumT, ?> combineFn;
        private final TimestampCombiner timestampCombiner;
        private final Coder<K> keyCoder;
        private transient Map<WindowedStructuralKey<K>, AccumT> accumulators;
        private transient Map<WindowedStructuralKey<K>, Instant> timestamps;

        private CombineInputs(Combine.CombineFn<InputT, AccumT, ?> combineFn, TimestampCombiner timestampCombiner, Coder<K> keyCoder) {
            this.combineFn = combineFn;
            this.timestampCombiner = timestampCombiner;
            this.keyCoder = keyCoder;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.accumulators = new LinkedHashMap<WindowedStructuralKey<K>, AccumT>();
            this.timestamps = new LinkedHashMap<WindowedStructuralKey<K>, Instant>();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context, BoundedWindow window) {
            WindowedStructuralKey<Object> key = WindowedStructuralKey.create(this.keyCoder, ((KV)context.element()).getKey(), window);
            Object accumulator = this.accumulators.get(key);
            Instant assignedTs = this.timestampCombiner.assign(window, context.timestamp());
            if (accumulator == null) {
                accumulator = this.combineFn.createAccumulator();
                this.accumulators.put(key, accumulator);
                this.timestamps.put(key, assignedTs);
            }
            this.accumulators.put(key, this.combineFn.addInput(accumulator, ((KV)context.element()).getValue()));
            this.timestamps.put(key, this.timestampCombiner.combine(new Instant[]{assignedTs, this.timestamps.get(key)}));
        }

        @DoFn.FinishBundle
        public void outputAccumulators(DoFn.FinishBundleContext context) {
            for (Map.Entry<WindowedStructuralKey<K>, AccumT> preCombineEntry : this.accumulators.entrySet()) {
                context.output((Object)KV.of(preCombineEntry.getKey().getKey(), (Object)this.combineFn.compact(preCombineEntry.getValue())), this.timestamps.get(preCombineEntry.getKey()), preCombineEntry.getKey().getWindow());
            }
            this.accumulators = null;
            this.timestamps = null;
        }
    }

    static class Factory<K, InputT, AccumT, OutputT>
    extends SingleInputOutputOverrideFactory<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
        public static PTransformOverrideFactory create() {
            return new Factory();
        }

        private Factory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>, PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> transform) {
            CombineFnBase.GlobalCombineFn globalFn = ((Combine.PerKey)transform.getTransform()).getFn();
            Preconditions.checkState((boolean)(globalFn instanceof Combine.CombineFn), (String)"%s.matcher() should only match %s instances using %s, got %s", (Object)MultiStepCombine.class.getSimpleName(), (Object)Combine.PerKey.class.getSimpleName(), (Object)Combine.CombineFn.class.getSimpleName(), (Object)globalFn.getClass().getName());
            Combine.CombineFn fn = (Combine.CombineFn)globalFn;
            PCollection input = (PCollection)Iterables.getOnlyElement(transform.getInputs().values());
            PCollection output = (PCollection)Iterables.getOnlyElement(transform.getOutputs().values());
            return PTransformOverrideFactory.PTransformReplacement.of((PInput)input, new MultiStepCombine(fn, output.getCoder()));
        }
    }
}

