/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.source;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.source.FlinkRowCollector;
import org.apache.seatunnel.translation.flink.source.FlinkSourceReaderContext;
import org.apache.seatunnel.translation.flink.source.NoMoreElementEvent;
import org.apache.seatunnel.translation.flink.source.SourceEventWrapper;
import org.apache.seatunnel.translation.flink.source.SplitWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSourceReader<SplitT extends SourceSplit>
implements SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> {
    private final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReader.class);
    private final org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader;
    private final SourceReader.Context context;
    private final FlinkRowCollector flinkRowCollector;
    private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;
    private volatile CompletableFuture<Void> availabilityFuture;
    private static final long DEFAULT_WAIT_TIME_MILLIS = 1000L;
    private final ScheduledExecutorService scheduledExecutor;

    public FlinkSourceReader(org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader, SourceReader.Context context, Config envConfig) {
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(String.format("source-reader-scheduler-%d", context.getIndexOfSubtask())).build());
        this.sourceReader = sourceReader;
        this.context = context;
        this.flinkRowCollector = new FlinkRowCollector(envConfig, context.getMetricsContext());
    }

    public void start() {
        try {
            this.sourceReader.open();
            this.context.getEventListener().onEvent((Event)new ReaderOpenEvent());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public InputStatus pollNext(ReaderOutput<SeaTunnelRow> output) throws Exception {
        if (!((FlinkSourceReaderContext)this.context).isSendNoMoreElementEvent()) {
            this.sourceReader.pollNext((Collector)this.flinkRowCollector.withReaderOutput(output));
            if (this.flinkRowCollector.isEmptyThisPollNext()) {
                FlinkSourceReader flinkSourceReader = this;
                synchronized (flinkSourceReader) {
                    if (this.availabilityFuture == null || this.availabilityFuture.isDone()) {
                        this.availabilityFuture = new CompletableFuture();
                        this.scheduleComplete(this.availabilityFuture);
                        this.LOGGER.debug("No data available, wait for next poll.");
                    }
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        } else {
            Thread.sleep(1000L);
        }
        return this.inputStatus;
    }

    public List<SplitWrapper<SplitT>> snapshotState(long checkpointId) {
        try {
            List splitTS = this.sourceReader.snapshotState(checkpointId);
            return splitTS.stream().map(SplitWrapper::new).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Void> isAvailable() {
        CompletableFuture<Void> future = this.availabilityFuture;
        return future != null ? future : CompletableFuture.completedFuture(null);
    }

    public void addSplits(List<SplitWrapper<SplitT>> splits) {
        this.sourceReader.addSplits(splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()));
    }

    public void notifyNoMoreSplits() {
        this.sourceReader.handleNoMoreSplits();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            this.inputStatus = InputStatus.END_OF_INPUT;
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            this.sourceReader.handleSourceEvent(((SourceEventWrapper)sourceEvent).getSourceEvent());
        }
    }

    public void close() throws Exception {
        CompletableFuture<Void> future = this.availabilityFuture;
        if (future != null && !future.isDone()) {
            future.complete(null);
        }
        this.sourceReader.close();
        this.context.getEventListener().onEvent((Event)new ReaderCloseEvent());
        this.scheduledExecutor.shutdown();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.sourceReader.notifyCheckpointComplete(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.sourceReader.notifyCheckpointAborted(checkpointId);
    }

    private void scheduleComplete(CompletableFuture<Void> future) {
        this.scheduledExecutor.schedule(() -> future.complete(null), 1000L, TimeUnit.MILLISECONDS);
    }
}

