/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Optional;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.Bundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.portable.BundleFactory;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.CommittedResult;
import org.apache.beam.runners.direct.portable.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.portable.DirectMetrics;
import org.apache.beam.runners.direct.portable.DirectStateAndTimers;
import org.apache.beam.runners.direct.portable.StepAndKey;
import org.apache.beam.runners.direct.portable.StepStateAndTimers;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.runners.direct.portable.UncommittedBundle;
import org.apache.beam.runners.direct.portable.WatermarkCallbackExecutor;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class EvaluationContext {
    private final ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> graph;
    private final Clock clock;
    private final BundleFactory bundleFactory;
    private final WatermarkManager<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> watermarkManager;
    private final WatermarkCallbackExecutor callbackExecutor;
    private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals> applicationStateInternals;
    private final DirectMetrics metrics;
    private final Set<PipelineNode.PCollectionNode> keyedPValues;

    public static EvaluationContext create(Clock clock, BundleFactory bundleFactory, ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> graph, Set<PipelineNode.PCollectionNode> keyedPValues) {
        return new EvaluationContext(clock, bundleFactory, graph, keyedPValues);
    }

    private EvaluationContext(Clock clock, BundleFactory bundleFactory, ExecutableGraph<PipelineNode.PTransformNode, ? super PipelineNode.PCollectionNode> graph, Set<PipelineNode.PCollectionNode> keyedPValues) {
        this.clock = clock;
        this.bundleFactory = Preconditions.checkNotNull(bundleFactory);
        this.graph = Preconditions.checkNotNull(graph);
        this.keyedPValues = keyedPValues;
        this.watermarkManager = WatermarkManager.create(clock, graph);
        this.applicationStateInternals = new ConcurrentHashMap<StepAndKey, CopyOnAccessInMemoryStateInternals>();
        this.metrics = new DirectMetrics();
        this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
    }

    public void initialize(Map<PipelineNode.PTransformNode, ? extends Iterable<CommittedBundle<?>>> initialInputs) {
        this.watermarkManager.initialize(initialInputs);
    }

    public CommittedResult<PipelineNode.PTransformNode> handleResult(CommittedBundle<?> completedBundle, Iterable<TimerInternals.TimerData> completedTimers, TransformResult<?> result) {
        Iterable<? extends CommittedBundle<?>> committedBundles = this.commitBundles(result.getOutputBundles());
        this.metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());
        EnumSet<CommittedResult.OutputType> outputTypes = EnumSet.copyOf(result.getOutputTypes());
        if (Iterables.isEmpty(committedBundles)) {
            outputTypes.remove((Object)CommittedResult.OutputType.BUNDLE);
        } else {
            outputTypes.add(CommittedResult.OutputType.BUNDLE);
        }
        CommittedResult<PipelineNode.PTransformNode> committedResult = CommittedResult.create(result, this.getUnprocessedInput(completedBundle, result), committedBundles, outputTypes);
        CopyOnAccessInMemoryStateInternals theirState = result.getState();
        if (theirState != null) {
            CopyOnAccessInMemoryStateInternals committedState = theirState.commit();
            StepAndKey stepAndKey = StepAndKey.of(result.getTransform(), completedBundle.getKey());
            if (!committedState.isEmpty()) {
                this.applicationStateInternals.put(stepAndKey, committedState);
            } else {
                this.applicationStateInternals.remove(stepAndKey);
            }
        }
        this.watermarkManager.updateWatermarks(completedBundle, result.getTimerUpdate().withCompletedTimers(completedTimers), committedResult.getExecutable(), (Bundle)committedResult.getUnprocessedInputs().orNull(), committedResult.getOutputs(), result.getWatermarkHold());
        return committedResult;
    }

    private Optional<? extends CommittedBundle<?>> getUnprocessedInput(CommittedBundle<?> completedBundle, TransformResult<?> result) {
        if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
            return Optional.absent();
        }
        CommittedBundle<?> residual = completedBundle.withElements(result.getUnprocessedElements());
        return Optional.of(residual);
    }

    private Iterable<? extends CommittedBundle<?>> commitBundles(Iterable<? extends UncommittedBundle<?>> bundles) {
        ImmutableList.Builder completed = ImmutableList.builder();
        for (UncommittedBundle<?> inProgress : bundles) {
            PipelineNode.PTransformNode producing;
            WatermarkManager.TransformWatermarks watermarks;
            CommittedBundle<?> committed = inProgress.commit((watermarks = this.watermarkManager.getWatermarks(producing = this.graph.getProducer(inProgress.getPCollection()))).getSynchronizedProcessingOutputTime());
            if (Iterables.isEmpty(committed.getElements())) continue;
            completed.add(committed);
        }
        return completed.build();
    }

    private void fireAllAvailableCallbacks() {
        for (PipelineNode.PTransformNode transform : this.graph.getExecutables()) {
            this.fireAvailableCallbacks(transform);
        }
    }

    private void fireAvailableCallbacks(PipelineNode.PTransformNode producingTransform) {
        WatermarkManager.TransformWatermarks watermarks = this.watermarkManager.getWatermarks(producingTransform);
        Instant outputWatermark = watermarks.getOutputWatermark();
        this.callbackExecutor.fireForWatermark(producingTransform, outputWatermark);
    }

    public <T> UncommittedBundle<T> createRootBundle() {
        return this.bundleFactory.createRootBundle();
    }

    public <T> UncommittedBundle<T> createBundle(PipelineNode.PCollectionNode output) {
        return this.bundleFactory.createBundle(output);
    }

    public <K, T> UncommittedBundle<T> createKeyedBundle(StructuralKey<K> key, PipelineNode.PCollectionNode output) {
        return this.bundleFactory.createKeyedBundle(key, output);
    }

    public <T> boolean isKeyed(PipelineNode.PCollectionNode pValue) {
        return this.keyedPValues.contains(pValue);
    }

    public void scheduleAfterOutputWouldBeProduced(PipelineNode.PCollectionNode value, BoundedWindow window, WindowingStrategy<?, ?> windowingStrategy, Runnable runnable) {
        PipelineNode.PTransformNode producing = this.graph.getProducer(value);
        this.callbackExecutor.callOnWindowExpiration(producing, window, windowingStrategy, runnable);
        this.fireAvailableCallbacks(producing);
    }

    public <K> StepStateAndTimers<K> getStateAndTimers(PipelineNode.PTransformNode application, StructuralKey<K> key) {
        StepAndKey stepAndKey = StepAndKey.of(application, key);
        return new DirectStateAndTimers<K>(key, (CopyOnAccessInMemoryStateInternals)this.applicationStateInternals.get(stepAndKey), this.clock, this.watermarkManager.getWatermarks(application));
    }

    Collection<PipelineNode.PTransformNode> getSteps() {
        return this.graph.getExecutables();
    }

    public DirectMetrics getMetrics() {
        return this.metrics;
    }

    @VisibleForTesting
    void forceRefresh() {
        this.watermarkManager.refreshAll();
        this.fireAllAvailableCallbacks();
    }

    public Collection<WatermarkManager.FiredTimers<PipelineNode.PTransformNode>> extractFiredTimers() {
        this.forceRefresh();
        return this.watermarkManager.extractFiredTimers();
    }

    public boolean isDone(PipelineNode.PTransformNode transform) {
        Instant stepWatermark = this.watermarkManager.getWatermarks(transform).getOutputWatermark();
        return !stepWatermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    public boolean isDone() {
        for (PipelineNode.PTransformNode transform : this.graph.getExecutables()) {
            if (this.isDone(transform)) continue;
            return false;
        }
        return true;
    }

    public Instant now() {
        return this.clock.now();
    }

    Clock getClock() {
        return this.clock;
    }
}

