/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectTimerInternals;
import org.apache.beam.runners.direct.DoFnLifecycleManager;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.runners.direct.ParDoEvaluatorFactory;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;

final class StatefulParDoEvaluatorFactory<K, InputT, OutputT>
implements TransformEvaluatorFactory {
    private final LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> cleanupRegistry;
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;

    StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
        this.delegateFactory = new ParDoEvaluatorFactory(evaluationContext, ParDoEvaluator.defaultRunnerFactory(), new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>(){

            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedStatefulParDo) throws Exception {
                ParDoMultiOverrideFactory.StatefulParDo statefulParDo = (ParDoMultiOverrideFactory.StatefulParDo)appliedStatefulParDo.getTransform();
                return DoFnLifecycleManager.of(statefulParDo.getDoFn());
            }
        }, options);
        this.cleanupRegistry = CacheBuilder.newBuilder().weakValues().build((CacheLoader)new CleanupSchedulingLoader(evaluationContext));
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
        TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> application, CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> inputBundle) throws Exception {
        DoFn doFn = ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getDoFn();
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        if (signature.stateDeclarations().size() > 0) {
            for (WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> element : inputBundle.getElements()) {
                for (BoundedWindow window : element.getWindows()) {
                    this.cleanupRegistry.get(AppliedPTransformOutputKeyAndWindow.create(application, inputBundle.getKey(), window));
                }
            }
        }
        DoFnLifecycleManagerRemovingTransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> delegateEvaluator = this.delegateFactory.createEvaluator(application, inputBundle.getPCollection(), inputBundle.getKey(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSideInputs(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getMainOutputTag(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getAdditionalOutputTags().getAll(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSchemaInformation(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getSideInputMapping());
        return new StatefulParDoEvaluator<K, KV<K, InputT>>(delegateEvaluator);
    }

    private static class StatefulParDoEvaluator<K, InputT>
    implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
        private final List<TimerInternals.TimerData> pushedBackTimers = new ArrayList<TimerInternals.TimerData>();
        private final DirectTimerInternals timerInternals;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
            this.delegateEvaluator = delegateEvaluator;
            this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult) throws Exception {
            for (WindowedValue windowedValue : ((KeyedWorkItem)gbkResult.getValue()).elementsIterable()) {
                this.delegateEvaluator.processElement(windowedValue);
            }
            Instant currentInputWatermark = this.timerInternals.currentInputWatermarkTime();
            PriorityQueue<TimerInternals.TimerData> toBeFiredTimers = new PriorityQueue<TimerInternals.TimerData>(Comparator.comparing(TimerInternals.TimerData::getTimestamp));
            ((KeyedWorkItem)gbkResult.getValue()).timersIterable().forEach(toBeFiredTimers::add);
            while (!toBeFiredTimers.isEmpty()) {
                TimerInternals.TimerData timer = toBeFiredTimers.poll();
                Preconditions.checkState((boolean)(timer.getNamespace() instanceof StateNamespaces.WindowNamespace), (String)"Expected Timer %s to be in a %s, but got %s", (Object)timer, (Object)StateNamespaces.WindowNamespace.class.getSimpleName(), (Object)timer.getNamespace().getClass().getName());
                StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace)timer.getNamespace();
                Object timerWindow = windowNamespace.getWindow();
                this.delegateEvaluator.onTimer(timer, (BoundedWindow)timerWindow);
                if (!this.timerInternals.containsUpdateForTimeBefore(currentInputWatermark)) continue;
                break;
            }
            this.pushedBackTimers.addAll(toBeFiredTimers);
        }

        @Override
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            TransformResult<KV<K, InputT>> delegateResult = this.delegateEvaluator.finishBundle();
            WatermarkManager.TimerUpdate timerUpdate = delegateResult.getTimerUpdate().withPushedBackTimers(this.pushedBackTimers);
            this.pushedBackTimers.clear();
            StepTransformResult.Builder regroupedResult = StepTransformResult.withHold(delegateResult.getTransform(), delegateResult.getWatermarkHold()).withTimerUpdate(timerUpdate).withState(delegateResult.getState()).withMetricUpdates(delegateResult.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
            Iterator<WindowedValue<KV<K, InputT>>> iterator = delegateResult.getUnprocessedElements().iterator();
            while (iterator.hasNext()) {
                WindowedValue<KV<K, InputT>> untypedUnprocessed;
                WindowedValue<KV<K, InputT>> windowedKv = untypedUnprocessed = iterator.next();
                WindowedValue pushedBack = windowedKv.withValue(KeyedWorkItems.elementsWorkItem(((KV)windowedKv.getValue()).getKey(), Collections.singleton(windowedKv)));
                regroupedResult.addUnprocessedElements(pushedBack);
            }
            return regroupedResult.build();
        }
    }

    @AutoValue
    static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        AppliedPTransformOutputKeyAndWindow() {
        }

        abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        abstract StructuralKey<K> getKey();

        abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> transform, StructuralKey<K> key, BoundedWindow w) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>(transform, key, w);
        }
    }

    private class CleanupSchedulingLoader
    extends CacheLoader<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> {
        private final EvaluationContext evaluationContext;

        public CleanupSchedulingLoader(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        public Runnable load(AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> transformOutputWindow) {
            String stepName = this.evaluationContext.getStepName(transformOutputWindow.getTransform());
            HashMap<TupleTag, PCollection> taggedValues = new HashMap<TupleTag, PCollection>();
            for (Map.Entry pv : transformOutputWindow.getTransform().getOutputs().entrySet()) {
                taggedValues.put((TupleTag)pv.getKey(), (PCollection)pv.getValue());
            }
            PCollection pc = (PCollection)taggedValues.get(((ParDoMultiOverrideFactory.StatefulParDo)transformOutputWindow.getTransform().getTransform()).getMainOutputTag());
            WindowingStrategy windowingStrategy = pc.getWindowingStrategy();
            BoundedWindow window = transformOutputWindow.getWindow();
            DoFn doFn = ((ParDoMultiOverrideFactory.StatefulParDo)transformOutputWindow.getTransform().getTransform()).getDoFn();
            DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
            DirectExecutionContext.DirectStepContext stepContext = this.evaluationContext.getExecutionContext(transformOutputWindow.getTransform(), transformOutputWindow.getKey()).getStepContext(stepName);
            StateNamespace namespace = StateNamespaces.window(windowingStrategy.getWindowFn().windowCoder(), window);
            Runnable cleanup = () -> {
                for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
                    StateTag tag;
                    try {
                        tag = StateTags.tagForSpec(stateDecl.id(), (StateSpec)stateDecl.field().get(doFn));
                    }
                    catch (IllegalAccessException e) {
                        throw new RuntimeException(String.format("Error accessing %s for %s", StateSpec.class.getName(), doFn.getClass().getName()), e);
                    }
                    stepContext.stateInternals().state(namespace, tag).clear();
                }
                StatefulParDoEvaluatorFactory.this.cleanupRegistry.invalidate((Object)transformOutputWindow);
            };
            this.evaluationContext.scheduleAfterWindowExpiration(transformOutputWindow.getTransform(), window, windowingStrategy, cleanup);
            return cleanup;
        }
    }
}

