/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.Clock;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineOptions;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.KeyedResourcePool;
import com.google.cloud.dataflow.sdk.runners.inprocess.LockedKeyedResourcePool;
import com.google.cloud.dataflow.sdk.runners.inprocess.PTransformOverrideFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory;
import com.google.cloud.dataflow.sdk.testing.TestStream;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;

class TestStreamEvaluatorFactory
implements TransformEvaluatorFactory {
    private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators = LockedKeyedResourcePool.create();

    TestStreamEvaluatorFactory() {
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, @Nullable InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) throws Exception {
        return this.createEvaluator(application, evaluationContext);
    }

    private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application, InProcessEvaluationContext evaluationContext) throws ExecutionException {
        return this.evaluators.tryAcquire(application, new CreateEvaluator<OutputT>(application, evaluationContext, this.evaluators)).orNull();
    }

    private static class CreateEvaluator<OutputT>
    implements Callable<Evaluator<?>> {
        private final AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application;
        private final InProcessEvaluationContext evaluationContext;
        private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators;

        public CreateEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application, InProcessEvaluationContext evaluationContext, KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators) {
            this.application = application;
            this.evaluationContext = evaluationContext;
            this.evaluators = evaluators;
        }

        @Override
        public Evaluator<?> call() throws Exception {
            return new Evaluator(this.application, this.evaluationContext, this.evaluators);
        }
    }

    static class InProcessTestStreamFactory
    implements PTransformOverrideFactory {
        InProcessTestStreamFactory() {
        }

        @Override
        public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
            if (transform instanceof TestStream) {
                return new DirectTestStream((TestStream)transform);
            }
            return transform;
        }

        private static class DirectTestStream<T>
        extends PTransform<PBegin, PCollection<T>> {
            private final TestStream<T> original;

            private DirectTestStream(TestStream transform) {
                this.original = transform;
            }

            @Override
            public PCollection<T> apply(PBegin input) {
                PipelineRunner<?> runner = input.getPipeline().getRunner();
                Preconditions.checkState(runner instanceof InProcessPipelineRunner, "%s can only be used when running with the %s", this.getClass().getSimpleName(), InProcessPipelineRunner.class.getSimpleName());
                input.getPipeline().getOptions().as(InProcessPipelineOptions.class).setShutdownUnboundedProducersWithMaxWatermark(true);
                ((InProcessPipelineRunner)runner).setClockSupplier(new TestClockSupplier());
                return PCollection.createPrimitiveOutputInternal(input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED).setCoder((Coder)this.original.getValueCoder());
            }
        }
    }

    private static class TestClockSupplier
    implements Supplier<Clock> {
        private TestClockSupplier() {
        }

        @Override
        public Clock get() {
            return new TestClock();
        }
    }

    private static class TestClock
    implements Clock {
        private final AtomicReference<Instant> currentTime = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        private TestClock() {
        }

        public void advance(Duration amount) {
            Instant now = this.currentTime.get();
            this.currentTime.compareAndSet(now, now.plus((ReadableDuration)amount));
        }

        @Override
        public Instant now() {
            return this.currentTime.get();
        }
    }

    private static class Evaluator<T>
    implements TransformEvaluator<Object> {
        private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
        private final InProcessEvaluationContext context;
        private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache;
        private final List<TestStream.Event<T>> events;
        private int index;
        private Instant currentWatermark;

        private Evaluator(AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application, InProcessEvaluationContext context, KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> cache) {
            this.application = application;
            this.context = context;
            this.cache = cache;
            this.events = application.getTransform().getEvents();
            this.index = 0;
            this.currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        @Override
        public void processElement(WindowedValue<Object> element) throws Exception {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public InProcessTransformResult finishBundle() throws Exception {
            try {
                if (this.index >= this.events.size()) {
                    StepTransformResult stepTransformResult = StepTransformResult.withoutHold(this.application).build();
                    return stepTransformResult;
                }
                TestStream.Event<T> event = this.events.get(this.index);
                if (event.getType().equals((Object)TestStream.EventType.WATERMARK)) {
                    this.currentWatermark = ((TestStream.WatermarkEvent)event).getWatermark();
                }
                StepTransformResult.Builder result = StepTransformResult.withHold(this.application, this.currentWatermark);
                if (event.getType().equals((Object)TestStream.EventType.ELEMENT)) {
                    InProcessPipelineRunner.UncommittedBundle bundle = this.context.createRootBundle(this.application.getOutput());
                    for (TimestampedValue elem : ((TestStream.ElementEvent)event).getElements()) {
                        bundle.add(WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
                    }
                    result.addOutput(bundle, new InProcessPipelineRunner.UncommittedBundle[0]);
                }
                if (event.getType().equals((Object)TestStream.EventType.PROCESSING_TIME)) {
                    ((TestClock)this.context.getClock()).advance(((TestStream.ProcessingTimeEvent)event).getProcessingTimeAdvance());
                }
                ++this.index;
                StepTransformResult stepTransformResult = result.build();
                return stepTransformResult;
            }
            finally {
                this.cache.release(this.application, this);
            }
        }
    }
}

