/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.LogHybridSourceFactory;
import org.apache.paimon.flink.source.StaticFileStoreSource;
import org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
import org.apache.paimon.flink.source.operator.MonitorFunction;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.Preconditions;

public class FlinkSourceBuilder {
    private final ObjectIdentifier tableIdentifier;
    private final Table table;
    private final Options conf;
    private boolean isContinuous = false;
    private StreamExecutionEnvironment env;
    @Nullable
    private int[][] projectedFields;
    @Nullable
    private Predicate predicate;
    @Nullable
    private LogSourceProvider logSourceProvider;
    @Nullable
    private Integer parallelism;
    @Nullable
    private Long limit;
    @Nullable
    private WatermarkStrategy<RowData> watermarkStrategy;

    public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.conf = Options.fromMap(table.options());
    }

    public FlinkSourceBuilder withContinuousMode(boolean isContinuous) {
        this.isContinuous = isContinuous;
        return this;
    }

    public FlinkSourceBuilder withEnv(StreamExecutionEnvironment env) {
        this.env = env;
        return this;
    }

    public FlinkSourceBuilder withProjection(int[][] projectedFields) {
        this.projectedFields = projectedFields;
        return this;
    }

    public FlinkSourceBuilder withPredicate(Predicate predicate) {
        this.predicate = predicate;
        return this;
    }

    public FlinkSourceBuilder withLimit(@Nullable Long limit) {
        this.limit = limit;
        return this;
    }

    public FlinkSourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider) {
        this.logSourceProvider = logSourceProvider;
        return this;
    }

    public FlinkSourceBuilder withParallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSourceBuilder withWatermarkStrategy(@Nullable WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
        return this;
    }

    private ReadBuilder createReadBuilder() {
        return this.table.newReadBuilder().withProjection(this.projectedFields).withFilter(this.predicate);
    }

    private DataStream<RowData> buildStaticFileSource() {
        Options options = Options.fromMap(this.table.options());
        return this.toDataStream(new StaticFileStoreSource(this.createReadBuilder(), this.limit, options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE), options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE)));
    }

    private DataStream<RowData> buildContinuousFileSource() {
        return this.toDataStream(new ContinuousFileStoreSource(this.createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable)this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> buildAlignedContinuousFileSource() {
        this.assertStreamingConfigurationForAlignMode(this.env);
        return this.toDataStream(new AlignedContinuousFileStoreSource(this.createReadBuilder(), this.table.options(), this.limit, this.table instanceof FileStoreTable ? ((FileStoreTable)this.table).bucketMode() : BucketMode.FIXED));
    }

    private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
        DataStreamSource dataStream = this.env.fromSource(source, this.watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : this.watermarkStrategy, this.tableIdentifier.asSummaryString(), this.produceTypeInfo());
        if (this.parallelism != null) {
            dataStream.setParallelism(this.parallelism.intValue());
        }
        return dataStream;
    }

    private TypeInformation<RowData> produceTypeInfo() {
        RowType rowType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        LogicalType produceType = (LogicalType)Optional.ofNullable(this.projectedFields).map(Projection::of).map(p -> p.project(rowType)).orElse(rowType);
        return InternalTypeInfo.of((LogicalType)produceType);
    }

    public DataStream<RowData> build() {
        if (this.env == null) {
            throw new IllegalArgumentException("StreamExecutionEnvironment should not be null.");
        }
        if (this.isContinuous) {
            TableScanUtils.streamingReadingValidate(this.table);
            CoreOptions.StartupMode startupMode = CoreOptions.startupMode(this.conf);
            CoreOptions.StreamingReadMode streamingReadMode = CoreOptions.streamReadType(this.conf);
            if (this.logSourceProvider != null && streamingReadMode != CoreOptions.StreamingReadMode.FILE) {
                if (startupMode != CoreOptions.StartupMode.LATEST_FULL) {
                    return this.toDataStream(this.logSourceProvider.createSource(null));
                }
                return this.toDataStream((Source<RowData, ?, ?>)HybridSource.builder((Source)LogHybridSourceFactory.buildHybridFirstSource(this.table, this.projectedFields, this.predicate)).addSource((HybridSource.SourceFactory)new LogHybridSourceFactory(this.logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED).build());
            }
            if (this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED).booleanValue()) {
                return this.buildAlignedContinuousFileSource();
            }
            if (this.conf.contains(CoreOptions.CONSUMER_ID)) {
                return this.buildContinuousStreamOperator();
            }
            return this.buildContinuousFileSource();
        }
        return this.buildStaticFileSource();
    }

    private DataStream<RowData> buildContinuousStreamOperator() {
        if (this.limit != null) {
            throw new IllegalArgumentException("Cannot limit streaming source, please use batch execution mode.");
        }
        SingleOutputStreamOperator dataStream = MonitorFunction.buildSource(this.env, this.tableIdentifier.asSummaryString(), this.produceTypeInfo(), this.createReadBuilder(), this.conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(), this.watermarkStrategy == null);
        if (this.parallelism != null) {
            dataStream.getTransformation().setParallelism(this.parallelism.intValue());
        }
        if (this.watermarkStrategy != null) {
            dataStream = dataStream.assignTimestampsAndWatermarks(this.watermarkStrategy);
        }
        return dataStream;
    }

    public void assertStreamingConfigurationForAlignMode(StreamExecutionEnvironment env) {
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        Preconditions.checkArgument(checkpointConfig.isCheckpointingEnabled(), "The align mode of paimon source is only supported when checkpoint enabled. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL.key() + "larger than 0");
        Preconditions.checkArgument(checkpointConfig.getMaxConcurrentCheckpoints() == 1, "The align mode of paimon source supports at most one ongoing checkpoint at the same time. Please set " + ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.key() + " to 1");
        Preconditions.checkArgument(checkpointConfig.getCheckpointTimeout() > this.conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(), "The align mode of paimon source requires that the timeout of checkpoint is greater than the timeout of the source's snapshot alignment. Please increase " + ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key() + " or decrease " + FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT.key());
        Preconditions.checkArgument(!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(), "The align mode of paimon source currently does not support unaligned checkpoints. Please set " + ExecutionCheckpointingOptions.ENABLE_UNALIGNED.key() + " to false.");
        Preconditions.checkArgument(env.getCheckpointConfig().getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE, "The align mode of paimon source currently only supports EXACTLY_ONCE checkpoint mode. Please set " + ExecutionCheckpointingOptions.CHECKPOINTING_MODE.key() + " to exactly-once");
    }
}

