/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.flink.source.FlinkTableSource;
import org.apache.paimon.flink.source.NumberSequenceRowSource;
import org.apache.paimon.flink.source.WatermarkAlignUtils;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;

public abstract class BaseDataTableSource
extends FlinkTableSource
implements LookupTableSource,
SupportsWatermarkPushDown,
SupportsAggregatePushDown {
    private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS = Arrays.asList(CoreOptions.SCAN_TIMESTAMP, CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_WATERMARK, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TAG_NAME, CoreOptions.SCAN_VERSION);
    protected final ObjectIdentifier tableIdentifier;
    protected final boolean streaming;
    protected final DynamicTableFactory.Context context;
    @Nullable
    protected final LogStoreTableFactory logStoreTableFactory;
    @Nullable
    protected WatermarkStrategy<RowData> watermarkStrategy;
    @Nullable
    protected Long countPushed;

    public BaseDataTableSource(ObjectIdentifier tableIdentifier, Table table, boolean streaming, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy<RowData> watermarkStrategy, @Nullable Long countPushed) {
        super(table, predicate, projectFields, limit);
        this.tableIdentifier = tableIdentifier;
        this.streaming = streaming;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
        this.predicate = predicate;
        this.projectFields = projectFields;
        this.limit = limit;
        this.watermarkStrategy = watermarkStrategy;
        this.countPushed = countPushed;
    }

    public ChangelogMode getChangelogMode() {
        if (!this.streaming) {
            return ChangelogMode.insertOnly();
        }
        if (this.table.primaryKeys().isEmpty()) {
            return ChangelogMode.insertOnly();
        }
        Options options = Options.fromMap(this.table.options());
        if (new CoreOptions(options).mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
            return ChangelogMode.insertOnly();
        }
        if (options.get(FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE).booleanValue()) {
            return ChangelogMode.all();
        }
        if (this.logStoreTableFactory == null && options.get(CoreOptions.CHANGELOG_PRODUCER) != CoreOptions.ChangelogProducer.NONE) {
            return ChangelogMode.all();
        }
        return options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL && options.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL ? ChangelogMode.all() : ChangelogMode.upsert();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        if (this.countPushed != null) {
            return this.createCountStarScan();
        }
        LogSourceProvider logSourceProvider = null;
        if (this.logStoreTableFactory != null) {
            logSourceProvider = this.logStoreTableFactory.createSourceProvider(this.context, (DynamicTableSource.Context)scanContext, this.projectFields);
        }
        WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
        Options options = Options.fromMap(this.table.options());
        if (watermarkStrategy != null) {
            String watermarkAlignGroup;
            Duration idleTimeout;
            FlinkConnectorOptions.WatermarkEmitStrategy emitStrategy = options.get(FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY);
            if (emitStrategy == FlinkConnectorOptions.WatermarkEmitStrategy.ON_EVENT) {
                watermarkStrategy = new WatermarkStrategy<RowData>(watermarkStrategy);
            }
            if ((idleTimeout = options.get(FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT)) != null) {
                watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
            }
            if ((watermarkAlignGroup = options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP)) != null) {
                watermarkStrategy = WatermarkAlignUtils.withWatermarkAlignment(watermarkStrategy, watermarkAlignGroup, options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT), options.get(FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL));
            }
        }
        FlinkSourceBuilder sourceBuilder = new FlinkSourceBuilder(this.table).sourceName(this.tableIdentifier.asSummaryString()).sourceBounded(!this.streaming).logSourceProvider(logSourceProvider).projection(this.projectFields).predicate(this.predicate).limit(this.limit).watermarkStrategy(watermarkStrategy).dynamicPartitionFilteringFields(this.dynamicPartitionFilteringFields());
        return new PaimonDataStreamScanProvider(!this.streaming, env -> sourceBuilder.sourceParallelism(this.inferSourceParallelism((StreamExecutionEnvironment)env)).env((StreamExecutionEnvironment)env).build());
    }

    private ScanTableSource.ScanRuntimeProvider createCountStarScan() {
        Preconditions.checkNotNull(this.countPushed);
        final NumberSequenceRowSource source = new NumberSequenceRowSource(this.countPushed, this.countPushed);
        return new SourceProvider(){

            public Source<RowData, ?, ?> createSource() {
                return source;
            }

            public boolean isBounded() {
                return true;
            }

            public Optional<Integer> getParallelism() {
                return Optional.of(1);
            }
        };
    }

    protected abstract List<String> dynamicPartitionFilteringFields();

    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext context) {
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException("Currently, lookup dim table only support FileStoreTable but is " + this.table.getClass().getName());
        }
        if (this.limit != null) {
            throw new RuntimeException("Limit push down should not happen in Lookup source, but it is " + this.limit);
        }
        int[] projection = this.projectFields == null ? IntStream.range(0, this.table.rowType().getFieldCount()).toArray() : Projection.of(this.projectFields).toTopLevelIndexes();
        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
        Options options = new Options(this.table.options());
        boolean enableAsync = options.get(FlinkConnectorOptions.LOOKUP_ASYNC);
        int asyncThreadNumber = options.get(FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER);
        return LookupRuntimeProviderFactory.create(this.getFileStoreLookupFunction(context, this.timeTravelDisabledTable((FileStoreTable)this.table), projection, joinKey), enableAsync, asyncThreadNumber);
    }

    protected FileStoreLookupFunction getFileStoreLookupFunction(LookupTableSource.LookupContext context, Table table, int[] projection, int[] joinKey) {
        return new FileStoreLookupFunction(table, projection, joinKey, this.predicate);
    }

    private FileStoreTable timeTravelDisabledTable(FileStoreTable table) {
        HashMap<String, String> newOptions = new HashMap<String, String>(table.options());
        TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);
        CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode();
        if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
            startupMode = CoreOptions.StartupMode.LATEST_FULL;
        }
        newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        TableSchema newSchema = table.schema().copy(newOptions);
        return table.copy(newSchema);
    }

    public boolean applyAggregates(List<int[]> groupingSets, List<AggregateExpression> aggregateExpressions, DataType producedDataType) {
        if (this.isStreaming()) {
            return false;
        }
        if (!(this.table instanceof DataTable)) {
            return false;
        }
        if (groupingSets.size() != 1) {
            return false;
        }
        if (groupingSets.get(0).length != 0) {
            return false;
        }
        if (aggregateExpressions.size() != 1) {
            return false;
        }
        if (!aggregateExpressions.get(0).getFunctionDefinition().getClass().getName().equals("org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction")) {
            return false;
        }
        List<Split> splits = this.table.newReadBuilder().dropStats().withFilter(this.predicate).newScan().plan().splits();
        long countPushed = 0L;
        for (Split s : splits) {
            if (!(s instanceof DataSplit)) {
                return false;
            }
            DataSplit split = (DataSplit)s;
            if (!split.mergedRowCountAvailable()) {
                return false;
            }
            countPushed += split.mergedRowCount();
        }
        this.countPushed = countPushed;
        return true;
    }

    public String asSummaryString() {
        return "Paimon-DataSource";
    }

    @Override
    public boolean isStreaming() {
        return this.streaming;
    }
}

