/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.portable.BundleFactory;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.DirectGroupByKey;
import org.apache.beam.runners.direct.portable.StepTransformResult;
import org.apache.beam.runners.direct.portable.TransformEvaluator;
import org.apache.beam.runners.direct.portable.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.runners.direct.portable.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;

class GroupByKeyOnlyEvaluatorFactory
implements TransformEvaluatorFactory {
    private final RunnerApi.Components components;
    private final BundleFactory bundleFactory;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;

    GroupByKeyOnlyEvaluatorFactory(ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph, RunnerApi.Components components, BundleFactory bundleFactory) {
        this.components = components;
        this.bundleFactory = bundleFactory;
        this.graph = graph;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode application, CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(PipelineNode.PTransformNode application) {
        return new GroupByKeyOnlyEvaluator(application);
    }

    private class GroupByKeyOnlyEvaluator<K, V>
    implements TransformEvaluator<KV<K, V>> {
        private final Coder<K> keyCoder;
        private final Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap;
        private final PipelineNode.PCollectionNode outputPCollection;
        private final StepTransformResult.Builder<KV<K, V>> resultBuilder;

        private GroupByKeyOnlyEvaluator(PipelineNode.PTransformNode application) {
            this.keyCoder = this.getKeyCoder(application);
            this.groupingMap = new HashMap<StructuralKey<K>, List<WindowedValue<V>>>();
            this.outputPCollection = (PipelineNode.PCollectionNode)Iterables.getOnlyElement(GroupByKeyOnlyEvaluatorFactory.this.graph.getProduced(application));
            this.resultBuilder = StepTransformResult.withoutHold(application);
        }

        private Coder<K> getKeyCoder(PipelineNode.PTransformNode application) {
            PipelineNode.PCollectionNode inputPCollection = (PipelineNode.PCollectionNode)Iterables.getOnlyElement(GroupByKeyOnlyEvaluatorFactory.this.graph.getPerElementInputs(application));
            try {
                RunnerApi.Components.Builder builder = GroupByKeyOnlyEvaluatorFactory.this.components.toBuilder();
                String wireCoderId = WireCoders.addRunnerWireCoder(inputPCollection, builder);
                Coder<?> wireCoder = RehydratedComponents.forComponents(builder.build()).getCoder(wireCoderId);
                Preconditions.checkArgument((boolean)(wireCoder instanceof WindowedValue.WindowedValueCoder), (String)"Wire %s must be a %s", (Object)Coder.class.getSimpleName(), (Object)WindowedValue.WindowedValueCoder.class.getSimpleName());
                WindowedValue.WindowedValueCoder windowedValueCoder = (WindowedValue.WindowedValueCoder)wireCoder;
                Preconditions.checkArgument((boolean)(windowedValueCoder.getValueCoder() instanceof KvCoder), (String)"Input elements to %s must be encoded with a %s", (Object)DirectGroupByKey.DirectGroupByKeyOnly.class.getSimpleName(), (Object)KvCoder.class.getSimpleName());
                KvCoder kvCoder = (KvCoder)windowedValueCoder.getValueCoder();
                return kvCoder.getKeyCoder();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void processElement(WindowedValue<KV<K, V>> element) {
            KV kv = (KV)element.getValue();
            Object key = kv.getKey();
            StructuralKey<Object> groupingKey = StructuralKey.of(key, this.keyCoder);
            List values = this.groupingMap.computeIfAbsent(groupingKey, k -> new ArrayList());
            values.add(element.withValue(kv.getValue()));
        }

        @Override
        public TransformResult<KV<K, V>> finishBundle() {
            for (Map.Entry<StructuralKey<K>, List<WindowedValue<V>>> groupedEntry : this.groupingMap.entrySet()) {
                K key = groupedEntry.getKey().getKey();
                KeyedWorkItem groupedKv = KeyedWorkItems.elementsWorkItem(key, (Iterable)groupedEntry.getValue());
                UncommittedBundle bundle = GroupByKeyOnlyEvaluatorFactory.this.bundleFactory.createKeyedBundle(StructuralKey.of(key, this.keyCoder), this.outputPCollection);
                bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
                this.resultBuilder.addOutput(bundle, new UncommittedBundle[0]);
            }
            return this.resultBuilder.build();
        }
    }
}

