/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

class GroupByKeyOnlyEvaluatorFactory
implements TransformEvaluatorFactory {
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;

    GroupByKeyOnlyEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    public <InputT> @UnknownKeyFor @NonNull @Initialized TransformEvaluator<InputT> forApplication(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> application, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> @UnknownKeyFor @NonNull @Initialized TransformEvaluator<@UnknownKeyFor @NonNull @Initialized KV<K, V>> createEvaluator(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application) {
        return new GroupByKeyOnlyEvaluator<K, V>(this.evaluationContext, application);
    }

    private static class GroupByKeyOnlyEvaluator<@UnknownKeyFor K, @UnknownKeyFor V>
    implements TransformEvaluator<KV<K, V>> {
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application;
        private final @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder;
        private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized StructuralKey<K>, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized WindowedValue<V>>> groupingMap;

        public GroupByKeyOnlyEvaluator(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupByKeyOnly<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.application = application;
            this.keyCoder = this.getKeyCoder(((PCollection)Iterables.getOnlyElement(application.getInputs().values())).getCoder());
            this.groupingMap = new HashMap<StructuralKey<K>, List<WindowedValue<V>>>();
        }

        private @UnknownKeyFor @NonNull @Initialized Coder<K> getKeyCoder(@UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KV<K, V>> coder) {
            Preconditions.checkState((boolean)(coder instanceof KvCoder), (String)"%s requires a coder of class %s. This is an internal error; this is checked during pipeline construction but became corrupted.", (Object)this.getClass().getSimpleName(), (Object)KvCoder.class.getSimpleName());
            Coder keyCoder = ((KvCoder)coder).getKeyCoder();
            return keyCoder;
        }

        @Override
        public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, V>> element) {
            KV kv = (KV)element.getValue();
            Object key = kv.getKey();
            StructuralKey<Object> groupingKey = StructuralKey.of(key, this.keyCoder);
            List values = this.groupingMap.computeIfAbsent(groupingKey, k -> new ArrayList());
            values.add(element.withValue(kv.getValue()));
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized TransformResult<@UnknownKeyFor @NonNull @Initialized KV<K, V>> finishBundle() {
            StepTransformResult.Builder resultBuilder = StepTransformResult.withoutHold(this.application);
            for (Map.Entry<StructuralKey<K>, List<WindowedValue<V>>> groupedEntry : this.groupingMap.entrySet()) {
                K key = groupedEntry.getKey().getKey();
                KeyedWorkItem groupedKv = KeyedWorkItems.elementsWorkItem(key, (Iterable)groupedEntry.getValue());
                UncommittedBundle bundle = this.evaluationContext.createKeyedBundle(StructuralKey.of(key, this.keyCoder), (PCollection)Iterables.getOnlyElement(this.application.getOutputs().values()));
                bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
                resultBuilder.addOutput(bundle, new UncommittedBundle[0]);
            }
            return resultBuilder.build();
        }
    }
}

