/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.RootInputProvider;
import org.apache.beam.runners.direct.SourceShard;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class UnboundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    private static final @UnknownKeyFor @NonNull @Initialized double DEFAULT_READER_REUSE_CHANCE = 0.95;
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized double readerReuseChance;

    UnboundedReadEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this(evaluationContext, options, 0.95);
    }

    @VisibleForTesting
    UnboundedReadEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized double readerReuseChance) {
        this.evaluationContext = evaluationContext;
        this.options = options;
        this.readerReuseChance = readerReuseChance;
    }

    @Override
    public <InputT> @Nullable @UnknownKeyFor @Initialized TransformEvaluator<InputT> forApplication(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> application, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> inputBundle) {
        return this.createEvaluator(application);
    }

    private <OutputT> /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized TransformEvaluator<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> createEvaluator(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<OutputT>,  @UnknownKeyFor @NonNull @Initialized SplittableParDo.PrimitiveUnboundedRead<OutputT>> application) {
        return new UnboundedReadEvaluator(application, this.evaluationContext, this.options, this.readerReuseChance);
    }

    @Override
    public void cleanup() {
    }

    static class InputProvider<@UnknownKeyFor T>
    implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> {
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;

        InputProvider(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
            this.evaluationContext = evaluationContext;
            this.options = options;
        }

        @Override
        public /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<T, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>>> getInitialInputs(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>, @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @NonNull @Initialized PBegin, @UnknownKeyFor @NonNull @Initialized PCollection<T>>> transform, @UnknownKeyFor @NonNull @Initialized int targetParallelism) throws @UnknownKeyFor @NonNull @Initialized Exception {
            UnboundedSource source = ReadTranslation.unboundedSourceFromTransform(transform);
            List splits = source.split(targetParallelism, this.options);
            UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
            ImmutableList.Builder initialShards = ImmutableList.builder();
            for (UnboundedSource split : splits) {
                UnboundedSourceShard shard = UnboundedSourceShard.unstarted(split, deduplicator);
                initialShards.add(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(shard)).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return initialShards.build();
        }
    }

    @AutoValue
    static abstract class UnboundedSourceShard<@UnknownKeyFor T, @UnknownKeyFor CheckpointT extends // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
    implements SourceShard<T> {
        UnboundedSourceShard() {
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> @UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<T, CheckpointT> unstarted(@UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointT> source, @UnknownKeyFor @NonNull @Initialized UnboundedReadDeduplicator deduplicator) {
            return UnboundedSourceShard.of(source, deduplicator, null, null);
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> @UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<T, CheckpointT> of(@UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointT> source, @UnknownKeyFor @NonNull @Initialized UnboundedReadDeduplicator deduplicator, // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable @UnknownKeyFor @Initialized UnboundedSource.UnboundedReader<T> reader, @Nullable CheckpointT checkpoint) {
            return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard<T, CheckpointT>(source, deduplicator, reader, checkpoint);
        }

        public abstract @UnknownKeyFor @NonNull @Initialized UnboundedSource<T, CheckpointT> getSource();

        abstract @UnknownKeyFor @NonNull @Initialized UnboundedReadDeduplicator getDeduplicator();

        abstract // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable @UnknownKeyFor @Initialized UnboundedSource.UnboundedReader<T> getExistingReader();

        abstract @Nullable CheckpointT getCheckpoint();
    }

    private static class UnboundedReadEvaluator<@UnknownKeyFor OutputT, @UnknownKeyFor CheckpointMarkT extends // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>
    implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
        private static final @UnknownKeyFor @NonNull @Initialized int ARBITRARY_MAX_ELEMENTS = 10;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized PCollection<OutputT>, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> transform;
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
        private final @UnknownKeyFor @NonNull @Initialized double readerReuseChance;
        private final @UnknownKeyFor @NonNull @Initialized StepTransformResult.Builder resultBuilder;

        public UnboundedReadEvaluator(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized PCollection<OutputT>, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> transform, @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized double readerReuseChance) {
            this.transform = transform;
            this.evaluationContext = evaluationContext;
            this.options = options;
            this.readerReuseChance = readerReuseChance;
            this.resultBuilder = StepTransformResult.withoutHold(transform);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws @UnknownKeyFor @NonNull @Initialized IOException {
            block21: {
                UncommittedBundle output = this.evaluationContext.createBundle((PCollection)Iterables.getOnlyElement(this.transform.getOutputs().values()));
                UnboundedSourceShard shard = (UnboundedSourceShard)element.getValue();
                UnboundedSource.UnboundedReader<OutputT> reader = null;
                try {
                    reader = this.getReader(shard);
                    boolean elementAvailable = this.startReader(reader, shard);
                    if (elementAvailable) {
                        UnboundedReadDeduplicator deduplicator = shard.getDeduplicator();
                        int numElements = 0;
                        do {
                            if (!deduplicator.shouldOutput(reader.getCurrentRecordId())) continue;
                            output.add(WindowedValue.timestampedValueInGlobalWindow((Object)reader.getCurrent(), (Instant)reader.getCurrentTimestamp()));
                        } while (++numElements < 10 && reader.advance());
                        Instant watermark = reader.getWatermark();
                        CheckpointMarkT finishedCheckpoint = this.finishRead(reader, watermark, shard);
                        if (ThreadLocalRandom.current().nextDouble(1.0) >= this.readerReuseChance) {
                            UnboundedSource.UnboundedReader<OutputT> toClose = reader;
                            reader = null;
                            toClose.close();
                        }
                        UnboundedSourceShard residual = UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), reader, finishedCheckpoint);
                        this.resultBuilder.addOutput(output, new UncommittedBundle[0]).addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(residual, (Instant)watermark)));
                        break block21;
                    }
                    Instant watermark = reader.getWatermark();
                    if (watermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                        this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), reader, shard.getCheckpoint()), (Instant)watermark)));
                        break block21;
                    }
                    Object checkpoint = shard.getCheckpoint();
                    IOException ioe = null;
                    try {
                        if (checkpoint != null) {
                            checkpoint.finalizeCheckpoint();
                        }
                    }
                    catch (IOException finalizeCheckpointException) {
                        ioe = finalizeCheckpointException;
                    }
                    finally {
                        try {
                            UnboundedSource.UnboundedReader<OutputT> toClose = reader;
                            reader = null;
                            toClose.close();
                        }
                        catch (IOException closeEx) {
                            if (ioe != null) {
                                ioe.addSuppressed(closeEx);
                            }
                            throw closeEx;
                        }
                    }
                    if (ioe != null) {
                        throw ioe;
                    }
                }
                catch (IOException e) {
                    if (reader != null) {
                        reader.close();
                    }
                    throw e;
                }
            }
        }

        private // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT> getReader(@UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws @UnknownKeyFor @NonNull @Initialized IOException {
            UnboundedSource.UnboundedReader<OutputT> existing = shard.getExistingReader();
            if (existing == null) {
                Object checkpoint = shard.getCheckpoint();
                if (checkpoint != null) {
                    checkpoint = (UnboundedSource.CheckpointMark)CoderUtils.clone((Coder)shard.getSource().getCheckpointMarkCoder(), checkpoint);
                }
                return shard.getSource().createReader(this.options, checkpoint);
            }
            return existing;
        }

        private @UnknownKeyFor @NonNull @Initialized boolean startReader(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT> reader, @UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (shard.getExistingReader() == null) {
                return reader.start();
            }
            return shard.getExistingReader().advance();
        }

        private CheckpointMarkT finishRead(// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<OutputT> reader, @UnknownKeyFor @NonNull @Initialized Instant watermark, @UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws @UnknownKeyFor @NonNull @Initialized IOException {
            CheckpointMarkT oldMark = shard.getCheckpoint();
            UnboundedSource.CheckpointMark mark = reader.getCheckpointMark();
            if (oldMark != null) {
                oldMark.finalizeCheckpoint();
            }
            if (!watermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                PCollection outputPc = (PCollection)Iterables.getOnlyElement(this.transform.getOutputs().values());
                this.evaluationContext.scheduleAfterOutputWouldBeProduced(outputPc, (BoundedWindow)GlobalWindow.INSTANCE, outputPc.getWindowingStrategy(), () -> {
                    try {
                        mark.finalizeCheckpoint();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                    }
                });
            }
            return (CheckpointMarkT)mark;
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized TransformResult<@UnknownKeyFor @NonNull @Initialized UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() throws @UnknownKeyFor @NonNull @Initialized IOException {
            return this.resultBuilder.build();
        }
    }
}

