/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.cluster;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.cluster.RemoveClusterBeforeFilesOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;

public class IncrementalClusterSplitSource
extends AbstractNonCoordinatedSource<Split> {
    private static final long serialVersionUID = 2L;
    private final List<Split> splits;

    public IncrementalClusterSplitSource(List<Split> splits) {
        this.splits = splits;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SourceReader<Split, SimpleSourceSplit> createReader(SourceReaderContext readerContext) throws Exception {
        return new Reader();
    }

    public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource(StreamExecutionEnvironment env, FileStoreTable table, Map<String, String> partitionSpec, List<DataSplit> splits, @Nullable CommitMessage dvCommitMessage, @Nullable Integer parallelism) {
        SingleOutputStreamOperator source = env.fromSource((Source)new IncrementalClusterSplitSource(splits), WatermarkStrategy.noWatermarks(), String.format("Incremental-cluster split generator: %s - %s", table.fullName(), partitionSpec), new JavaTypeInfo<Split>(Split.class)).forceNonParallel();
        PartitionTransformation partitioned = new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new RebalancePartitioner());
        if (parallelism != null) {
            partitioned.setParallelism(parallelism.intValue());
        }
        return Pair.of(new DataStream(source.getExecutionEnvironment(), (Transformation)partitioned).transform(String.format("Incremental-cluster reader: %s - %s", table.fullName(), partitionSpec), (TypeInformation)InternalTypeInfo.of((RowType)LogicalTypeConversion.toLogicalType(table.rowType())), (OneInputStreamOperator)new ReadOperator(table::newRead, null, null)), source.forward().transform("Remove files to be clustered", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new RemoveClusterBeforeFilesOperator(dvCommitMessage)).forceNonParallel());
    }

    private class Reader
    extends AbstractNonCoordinatedSourceReader<Split> {
        private Reader() {
        }

        public InputStatus pollNext(ReaderOutput<Split> output) {
            for (Split split : IncrementalClusterSplitSource.this.splits) {
                DataSplit dataSplit = (DataSplit)split;
                output.collect((Object)dataSplit);
            }
            return InputStatus.END_OF_INPUT;
        }
    }
}

