/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketUnawareCompactSource
extends AbstractNonCoordinatedSource<UnawareAppendCompactionTask> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class);
    private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator";
    private final FileStoreTable table;
    private final boolean streaming;
    private final long scanInterval;
    private final Predicate filter;

    public BucketUnawareCompactSource(FileStoreTable table, boolean isStreaming, long scanInterval, @Nullable Predicate filter) {
        this.table = table;
        this.streaming = isStreaming;
        this.scanInterval = scanInterval;
        this.filter = filter;
    }

    public Boundedness getBoundedness() {
        return this.streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<UnawareAppendCompactionTask, SimpleSourceSplit> createReader(SourceReaderContext readerContext) throws Exception {
        Preconditions.checkArgument((readerContext.currentParallelism() == 1 ? 1 : 0) != 0, (Object)"Compaction Operator parallelism in paimon MUST be one.");
        return new BucketUnawareCompactSourceReader(this.table, this.streaming, this.filter, this.scanInterval);
    }

    public static DataStreamSource<UnawareAppendCompactionTask> buildSource(StreamExecutionEnvironment env, BucketUnawareCompactSource source, String tableIdentifier) {
        return (DataStreamSource)env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "Compaction Coordinator : " + tableIdentifier, (TypeInformation)new CompactionTaskTypeInfo()).setParallelism(1).setMaxParallelism(1);
    }

    public static class BucketUnawareCompactSourceReader
    extends AbstractNonCoordinatedSourceReader<UnawareAppendCompactionTask> {
        private final UnawareAppendTableCompactionCoordinator compactionCoordinator;
        private final long scanInterval;

        public BucketUnawareCompactSourceReader(FileStoreTable table, boolean streaming, Predicate filter, long scanInterval) {
            this.scanInterval = scanInterval;
            this.compactionCoordinator = new UnawareAppendTableCompactionCoordinator(table, streaming, filter);
        }

        public InputStatus pollNext(ReaderOutput<UnawareAppendCompactionTask> readerOutput) throws Exception {
            boolean isEmpty;
            try {
                List<UnawareAppendCompactionTask> tasks = this.compactionCoordinator.run();
                isEmpty = tasks.isEmpty();
                tasks.forEach(arg_0 -> readerOutput.collect(arg_0));
            }
            catch (EndOfScanException esf) {
                LOG.info("Catching EndOfStreamException, the stream is finished.");
                return InputStatus.END_OF_INPUT;
            }
            if (isEmpty) {
                Thread.sleep(this.scanInterval);
            }
            return InputStatus.MORE_AVAILABLE;
        }
    }
}

