/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.ExternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.DynamicPartitionFilteringInfo;
import org.apache.paimon.flink.source.LogHybridSourceFactory;
import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.Preconditions;

public class FlinkSourceBuilder {
    private final Table table;
    private final Options conf;
    private String sourceName;
    private Boolean sourceBounded;
    private StreamExecutionEnvironment env;
    @Nullable
    private int[][] projectedFields;
    @Nullable
    private Predicate predicate;
    @Nullable
    private LogSourceProvider logSourceProvider;
    @Nullable
    private Integer parallelism;
    @Nullable
    private Long limit;
    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;
    @Nullable
    private DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo;

    public FlinkSourceBuilder(Table table) {
        this.table = table;
        this.sourceName = table.name();
        this.conf = Options.fromMap(table.options());
    }

    public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
        this.env = env;
        if (this.sourceBounded == null) {
            this.sourceBounded = !FlinkSink.isStreaming(env);
        }
        return this;
    }

    public FlinkSourceBuilder sourceName(String name) {
        this.sourceName = name;
        return this;
    }

    public FlinkSourceBuilder sourceBounded(boolean bounded) {
        this.sourceBounded = bounded;
        return this;
    }

    public FlinkSourceBuilder projection(int[] projectedFields) {
        return this.projection(Projection.of(projectedFields).toNestedIndexes());
    }

    public FlinkSourceBuilder projection(int[][] projectedFields) {
        this.projectedFields = projectedFields;
        return this;
    }

    public FlinkSourceBuilder predicate(Predicate predicate) {
        this.predicate = predicate;
        return this;
    }

    public FlinkSourceBuilder limit(@Nullable Long limit) {
        this.limit = limit;
        return this;
    }

    public FlinkSourceBuilder sourceParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSourceBuilder watermarkStrategy(@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        return this;
    }

    public FlinkSourceBuilder dynamicPartitionFilteringFields(List<String> dynamicPartitionFilteringFields) {
        if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) {
            Preconditions.checkState(this.table instanceof FileStoreTable, "Only Paimon FileStoreTable supports dynamic filtering but get %s.", this.table.getClass().getName());
            this.dynamicPartitionFilteringInfo = new DynamicPartitionFilteringInfo(((FileStoreTable)this.table).schema().logicalPartitionType(), dynamicPartitionFilteringFields);
        }
        return this;
    }

    @Deprecated
    FlinkSourceBuilder logSourceProvider(LogSourceProvider logSourceProvider) {
        this.logSourceProvider = logSourceProvider;
        return this;
    }

    private ReadBuilder createReadBuilder() {
        ReadBuilder readBuilder = this.table.newReadBuilder().withProjection(this.projectedFields).withFilter(this.predicate);
        if (this.limit != null) {
            readBuilder.withLimit(this.limit.intValue());
        }
        return readBuilder;
    }

    private DataStream<RowData> buildStaticFileSource() {
        Options options = Options.fromMap(this.table.options());
        return this.toDataStream(new StaticFileStoreSource(this.createReadBuilder(), this.limit, options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE), this.dynamicPartitionFilteringInfo));
    }

    private DataStream<RowData> buildContinuousFileSource() {
        return this.toDataStream(new ContinuousFileStoreSource(this.createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable)this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> buildAlignedContinuousFileSource() {
        this.assertStreamingConfigurationForAlignMode(this.env);
        return this.toDataStream(new AlignedContinuousFileStoreSource(this.createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable)this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
        DataStreamSource dataStream = this.env.fromSource(source, this.watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : this.watermarkStrategy, this.sourceName, this.produceTypeInfo());
        if (this.parallelism != null) {
            dataStream.setParallelism(this.parallelism.intValue());
        }
        return dataStream;
    }

    private TypeInformation<RowData> produceTypeInfo() {
        RowType rowType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        LogicalType produceType = (LogicalType)Optional.ofNullable(this.projectedFields).map(Projection::of).map(p -> p.project(rowType)).orElse(rowType);
        return InternalTypeInfo.of((LogicalType)produceType);
    }

    public DataStream<Row> buildForRow() {
        DataType rowType = TypeConversions.fromLogicalToDataType((LogicalType)LogicalTypeConversion.toLogicalType(this.table.rowType()));
        DataType[] fieldDataTypes = rowType.getChildren().toArray(new DataType[0]);
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(fieldDataTypes);
        DataStream<RowData> source = this.build();
        return source.map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toExternal(arg_0)).setParallelism(source.getParallelism()).returns((TypeInformation)ExternalTypeInfo.of((DataType)rowType));
    }

    public DataStream<RowData> build() {
        if (this.env == null) {
            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
        }
        if (this.sourceBounded.booleanValue()) {
            return this.buildStaticFileSource();
        }
        TableScanUtils.streamingReadingValidate(this.table);
        CoreOptions.StartupMode startupMode = CoreOptions.startupMode(this.conf);
        CoreOptions.StreamingReadMode streamingReadMode = CoreOptions.streamReadType(this.conf);
        if (this.logSourceProvider != null && streamingReadMode != CoreOptions.StreamingReadMode.FILE) {
            if (startupMode != CoreOptions.StartupMode.LATEST_FULL) {
                return this.toDataStream(this.logSourceProvider.createSource(null));
            }
            return this.toDataStream((Source<RowData, ?, ?>)HybridSource.builder((Source)LogHybridSourceFactory.buildHybridFirstSource(this.table, this.projectedFields, this.predicate)).addSource((HybridSource.SourceFactory)new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build());
        }
        if (this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED).booleanValue()) {
            return this.buildAlignedContinuousFileSource();
        }
        if (this.conf.contains(CoreOptions.CONSUMER_ID) && this.conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
            return this.buildContinuousStreamOperator();
        }
        return this.buildContinuousFileSource();
    }

    private DataStream<RowData> buildContinuousStreamOperator() {
        if (this.limit != null) {
            throw new IllegalArgumentException("Cannot limit streaming source, please use batch execution mode.");
        }
        SingleOutputStreamOperator dataStream = MonitorFunction.buildSource(this.env, this.sourceName, this.produceTypeInfo(), this.createReadBuilder(), this.conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), this.watermarkStrategy == null);
        if (this.parallelism != null) {
            dataStream.getTransformation().setParallelism(this.parallelism.intValue());
        }
        if (this.watermarkStrategy != null) {
            dataStream = dataStream.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        return dataStream;
    }

    private void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        Preconditions.checkArgument(checkpointConfig.isCheckpointingEnabled(), "The align mode of paimon source is only supported when checkpoint enabled. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + "larger than 0");
        Preconditions.checkArgument(checkpointConfig.getMaxConcurrentCheckpoints() == 1, "The align mode of paimon source supports at most one ongoing checkpoint at the same time. Please set " + ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key() + " to 1");
        Preconditions.checkArgument(checkpointConfig.getCheckpointTimeout() > this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(), "The align mode of paimon source requires that the timeout of checkpoint is greater than the timeout of the source's snapshot alignment. Please increase " + ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key() + " or decrease " + FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
        Preconditions.checkArgument(!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "The align mode of paimon source currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "The align mode of paimon source currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }
}

