/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.ReduceFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SystemReduceFn;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.UnsupportedSideInputReader;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TriggerTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.DirectTimerInternals;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class GroupAlsoByWindowEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;

    GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
        this.evaluationContext = evaluationContext;
        this.options = options;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application, CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
        return new GroupAlsoByWindowEvaluator<K, V>(this.evaluationContext, this.options, inputBundle, application);
    }

    private static class OutputWindowedValueToBundle<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final UncommittedBundle<KV<K, Iterable<V>>> bundle;

        private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> bundle) {
            this.bundle = bundle;
        }

        @Override
        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.bundle.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        @Override
        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException(String.format("%s should not use tagged outputs", DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName()));
        }
    }

    private static class GroupAlsoByWindowEvaluator<K, V>
    implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;
        private final AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application;
        private final DirectExecutionContext.DirectStepContext stepContext;
        private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
        private final StructuralKey<?> structuralKey;
        private final Collection<UncommittedBundle<?>> outputBundles;
        private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
        private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
        private final Counter droppedDueToLateness;

        public GroupAlsoByWindowEvaluator(EvaluationContext evaluationContext, PipelineOptions options, CommittedBundle<KeyedWorkItem<K, V>> inputBundle, AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.options = options;
            this.application = application;
            this.structuralKey = inputBundle.getKey();
            this.stepContext = evaluationContext.getExecutionContext(application, inputBundle.getKey()).getStepContext(evaluationContext.getStepName(application));
            this.windowingStrategy = ((DirectGroupByKey.DirectGroupAlsoByWindow)application.getTransform()).getInputWindowingStrategy();
            this.outputBundles = new ArrayList();
            this.unprocessedElements = ImmutableList.builder();
            Coder valueCoder = ((DirectGroupByKey.DirectGroupAlsoByWindow)application.getTransform()).getValueCoder(inputBundle.getPCollection().getCoder());
            this.reduceFn = SystemReduceFn.buffering(valueCoder);
            this.droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, (String)"DroppedDueToLateness");
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
            KeyedWorkItem workItem = (KeyedWorkItem)element.getValue();
            Object key = workItem.key();
            UncommittedBundle bundle = this.evaluationContext.createKeyedBundle(this.structuralKey, (PCollection)Iterables.getOnlyElement(this.application.getOutputs().values()));
            this.outputBundles.add(bundle);
            StateInternals stateInternals = this.stepContext.stateInternals();
            DirectTimerInternals timerInternals = this.stepContext.timerInternals();
            RunnerApi.Trigger runnerApiTrigger = TriggerTranslation.toProto(this.windowingStrategy.getTrigger());
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)), stateInternals, timerInternals, new OutputWindowedValueToBundle(bundle), new UnsupportedSideInputReader(DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName()), this.reduceFn, this.options);
            reduceFnRunner.processElements(this.dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
            reduceFnRunner.onTimers(workItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override
        public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals state = this.stepContext.commitState();
            return StepTransformResult.withHold(this.application, state.getEarliestWatermarkHold()).withState(state).addOutput(this.outputBundles).withTimerUpdate(this.stepContext.getTimerUpdate()).addUnprocessedElements(this.unprocessedElements.build()).build();
        }

        Iterable<WindowedValue<V>> dropExpiredWindows(K key, Iterable<WindowedValue<V>> elements, TimerInternals timerInternals) {
            return StreamSupport.stream(elements.spliterator(), false).flatMap(wv -> StreamSupport.stream(wv.explodeWindows().spliterator(), false)).filter(input -> {
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(input.getWindows());
                boolean expired = window.maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)timerInternals.currentInputWatermarkTime());
                if (expired) {
                    this.droppedDueToLateness.inc();
                    WindowTracing.debug((String)"{}: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", (Object[])new Object[]{DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName(), input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime()});
                }
                return !expired;
            }).collect(Collectors.toList());
        }
    }
}

