/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkBoundedSourceReader<@UnknownKeyFor T>
extends FlinkSourceReaderBase<T, WindowedValue<T>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class);
    @Nullable
    private // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Source.Reader<T> currentReader;
    private @UnknownKeyFor @NonNull @Initialized int currentSplitId = -1;

    public FlinkBoundedSourceReader(@UnknownKeyFor @NonNull @Initialized String stepName, @UnknownKeyFor @NonNull @Initialized SourceReaderContext context, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, @Nullable @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Function<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>, @UnknownKeyFor @NonNull @Initialized Long> timestampExtractor) {
        super(stepName, context, pipelineOptions, timestampExtractor);
    }

    @Override
    protected @UnknownKeyFor @NonNull @Initialized FlinkSourceSplit<T> getReaderCheckpoint(@UnknownKeyFor @NonNull @Initialized int splitId, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized FlinkSourceReaderBase. @UnknownKeyFor @NonNull @Initialized ReaderAndOutput readerAndOutput) throws @UnknownKeyFor @NonNull @Initialized CoderException {
        return new FlinkSourceSplit(splitId, readerAndOutput.reader.getCurrentSource());
    }

    @Override
    protected // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized Source.Reader<T> createReader(@Nonnull @UnknownKeyFor @NonNull @Initialized FlinkSourceSplit<T> sourceSplit) throws @UnknownKeyFor @NonNull @Initialized IOException {
        Source<T> beamSource = sourceSplit.getBeamSplitSource();
        return ((BoundedSource)beamSource).createReader(this.pipelineOptions);
    }

    @VisibleForTesting
    protected FlinkBoundedSourceReader(@UnknownKeyFor @NonNull @Initialized String stepName, @UnknownKeyFor @NonNull @Initialized SourceReaderContext context, @UnknownKeyFor @NonNull @Initialized PipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized ScheduledExecutorService executor, @Nullable @UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Function<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>, @UnknownKeyFor @NonNull @Initialized Long> timestampExtractor) {
        super(stepName, executor, context, pipelineOptions, timestampExtractor);
    }

    public @UnknownKeyFor @NonNull @Initialized InputStatus pollNext(@UnknownKeyFor @NonNull @Initialized ReaderOutput<@UnknownKeyFor @NonNull @Initialized WindowedValue<T>> output) throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.checkExceptionAndMaybeThrow();
        if (this.currentReader == null && this.currentSplitId == -1) {
            this.context.sendSplitRequest();
        }
        if (this.currentReader == null && !this.moveToNextNonEmptyReader()) {
            if (this.noMoreSplits()) {
                output.emitWatermark(Watermark.MAX_WATERMARK);
                if (this.checkIdleTimeoutAndMaybeStartCountdown()) {
                    LOG.info("All splits have finished reading, and idle time {} ms has passed.", (Object)this.idleTimeoutMs);
                    return InputStatus.END_OF_INPUT;
                }
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (this.currentReader != null) {
            Source.Reader<T> splitReader = this.currentReader;
            Object record = splitReader.getCurrent();
            WindowedValue windowedValue = WindowedValue.of((Object)record, (Instant)splitReader.getCurrentTimestamp(), (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
            if (this.timestampExtractor == null) {
                output.collect((Object)windowedValue);
            } else {
                output.collect((Object)windowedValue, ((Long)this.timestampExtractor.apply(windowedValue)).longValue());
            }
            this.numRecordsInCounter.inc();
            if (!this.invocationUtil.invokeAdvance(splitReader)) {
                this.finishSplit(this.currentSplitId);
                LOG.debug("Finished reading from {}", (Object)this.currentSplitId);
                this.currentReader = null;
                this.currentSplitId = -1;
                this.context.sendSplitRequest();
            }
            return InputStatus.MORE_AVAILABLE;
        }
        throw new IllegalArgumentException("If we reach here, the current beam reader should not be null");
    }

    @Override
    protected @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @org.checkerframework.checker.nullness.qual.Nullable @Initialized Void> isAvailableForAliveReaders() {
        return AVAILABLE_NOW;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean moveToNextNonEmptyReader() throws @UnknownKeyFor @NonNull @Initialized IOException {
        Optional<FlinkSourceReaderBase.ReaderAndOutput> readerAndOutput;
        while ((readerAndOutput = this.createAndTrackNextReader()).isPresent()) {
            FlinkSourceReaderBase.ReaderAndOutput rao = readerAndOutput.get();
            if (this.invocationUtil.invokeStart(rao.reader)) {
                this.currentSplitId = Integer.parseInt(rao.splitId);
                this.currentReader = rao.reader;
                return true;
            }
            this.finishSplit(Integer.parseInt(rao.splitId));
        }
        return false;
    }
}

