/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.postpone;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.postpone.RemovePostponeBucketFilesOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

public class PostponeBucketCompactSplitSource
extends AbstractNonCoordinatedSource<Split> {
    private static final long serialVersionUID = 1L;
    private final FileStoreTable table;
    private final Map<String, String> partitionSpec;

    public PostponeBucketCompactSplitSource(FileStoreTable table, Map<String, String> partitionSpec) {
        this.table = table;
        this.partitionSpec = partitionSpec;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SourceReader<Split, SimpleSourceSplit> createReader(SourceReaderContext readerContext) throws Exception {
        return new Reader();
    }

    public static Pair<DataStream<RowData>, DataStream<Committable>> buildSource(StreamExecutionEnvironment env, FileStoreTable table, Map<String, String> partitionSpec, @Nullable Integer parallelism) {
        SingleOutputStreamOperator source2 = env.fromSource((Source)new PostponeBucketCompactSplitSource(table, partitionSpec), WatermarkStrategy.noWatermarks(), String.format("Compact split generator: %s - %s", table.fullName(), partitionSpec), new JavaTypeInfo<Split>(Split.class)).forceNonParallel();
        FlinkStreamPartitioner<Split> partitioner = new FlinkStreamPartitioner<Split>(new SplitChannelComputer());
        PartitionTransformation partitioned = new PartitionTransformation(source2.getTransformation(), partitioner);
        if (parallelism != null) {
            partitioned.setParallelism(parallelism.intValue());
        }
        return Pair.of(new DataStream(source2.getExecutionEnvironment(), (Transformation)partitioned).transform(String.format("Compact split reader: %s - %s", table.fullName(), partitionSpec), (TypeInformation)InternalTypeInfo.of((RowType)LogicalTypeConversion.toLogicalType(table.rowType())), (OneInputStreamOperator)new ReadOperator(table::newRead, null, null)), source2.forward().transform("Remove new files", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new RemovePostponeBucketFilesOperator()).forceNonParallel());
    }

    private static class SplitChannelComputer
    implements ChannelComputer<Split> {
        private transient int numChannels;
        private transient Pattern pattern;

        private SplitChannelComputer() {
        }

        @Override
        public void setup(int numChannels) {
            this.numChannels = numChannels;
            this.pattern = Pattern.compile("-s-(\\d+?)-");
        }

        @Override
        public int channel(Split record) {
            DataSplit dataSplit = (DataSplit)record;
            String fileName = dataSplit.dataFiles().get(0).fileName();
            Matcher matcher = this.pattern.matcher(fileName);
            Preconditions.checkState(matcher.find(), "Data file name %s does not match the pattern. This is unexpected.", fileName);
            return ChannelComputer.select(dataSplit.partition(), Integer.parseInt(matcher.group(1)), this.numChannels);
        }
    }

    private class Reader
    extends AbstractNonCoordinatedSourceReader<Split> {
        private Reader() {
        }

        public InputStatus pollNext(ReaderOutput<Split> output) throws Exception {
            List<Split> splits = PostponeBucketCompactSplitSource.this.table.newSnapshotReader().withPartitionFilter(PostponeBucketCompactSplitSource.this.partitionSpec).withBucket(-2).read().splits();
            for (Split split : splits) {
                DataSplit dataSplit = (DataSplit)split;
                ArrayList<DataFileMeta> files = new ArrayList<DataFileMeta>(dataSplit.dataFiles());
                files.sort(Comparator.comparing(DataFileMeta::creationTime));
                for (DataFileMeta meta : files) {
                    DataSplit s = DataSplit.builder().withPartition(dataSplit.partition()).withBucket(dataSplit.bucket()).withBucketPath(dataSplit.bucketPath()).withTotalBuckets(dataSplit.totalBuckets()).withDataFiles(Collections.singletonList(meta)).isStreaming(false).build();
                    output.collect((Object)s);
                }
            }
            return InputStatus.END_OF_INPUT;
        }
    }
}

