/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkFilters;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;

@Internal
public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown,
SupportsSourceWatermark {
    private int[] projectedFields;
    private Long limit;
    private List<org.apache.iceberg.expressions.Expression> filters;
    private final TableLoader loader;
    private final ResolvedSchema schema;
    private final Map<String, String> properties;
    private final boolean isLimitPushDown;
    private final ReadableConfig readableConfig;

    private IcebergTableSource(IcebergTableSource toCopy) {
        this.loader = toCopy.loader;
        this.schema = toCopy.schema;
        this.properties = toCopy.properties;
        this.projectedFields = toCopy.projectedFields;
        this.isLimitPushDown = toCopy.isLimitPushDown;
        this.limit = toCopy.limit;
        this.filters = toCopy.filters;
        this.readableConfig = toCopy.readableConfig;
    }

    public IcebergTableSource(TableLoader loader, ResolvedSchema schema, Map<String, String> properties, ReadableConfig readableConfig) {
        this(loader, schema, properties, null, false, null, (List<org.apache.iceberg.expressions.Expression>)ImmutableList.of(), readableConfig);
    }

    private IcebergTableSource(TableLoader loader, ResolvedSchema schema, Map<String, String> properties, int[] projectedFields, boolean isLimitPushDown, Long limit, List<org.apache.iceberg.expressions.Expression> filters, ReadableConfig readableConfig) {
        this.loader = loader;
        this.schema = schema;
        this.properties = properties;
        this.projectedFields = projectedFields;
        this.isLimitPushDown = isLimitPushDown;
        this.limit = limit;
        this.filters = filters;
        this.readableConfig = readableConfig;
    }

    public void applyProjection(int[][] projectFields) {
        this.projectedFields = new int[projectFields.length];
        for (int i = 0; i < projectFields.length; ++i) {
            Preconditions.checkArgument((projectFields[i].length == 1 ? 1 : 0) != 0, (Object)"Don't support nested projection in iceberg source now.");
            this.projectedFields[i] = projectFields[i][0];
        }
    }

    private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv) {
        return FlinkSource.forRowData().env(execEnv).tableLoader(this.loader).setAll(this.properties).project(TableSchema.fromResolvedSchema((ResolvedSchema)this.getProjectedSchema())).limit(this.limit).filters(this.filters).flinkConf(this.readableConfig).build();
    }

    private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
        SplitAssignerType assignerType = (SplitAssignerType)((Object)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE));
        return IcebergSource.forRowData().tableLoader(this.loader).assignerFactory(assignerType.factory()).setAll(this.properties).project(this.getProjectedSchema()).limit(this.limit).filters(this.filters).flinkConfig(this.readableConfig).buildStream(env);
    }

    private ResolvedSchema getProjectedSchema() {
        if (this.projectedFields == null) {
            return this.schema;
        }
        List fullColumns = this.schema.getColumns();
        return ResolvedSchema.of(Arrays.stream(this.projectedFields).mapToObj(fullColumns::get).collect(Collectors.toList()));
    }

    public void applyLimit(long newLimit) {
        this.limit = newLimit;
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> flinkFilters) {
        ArrayList acceptedFilters = Lists.newArrayList();
        ArrayList expressions = Lists.newArrayList();
        for (ResolvedExpression resolvedExpression : flinkFilters) {
            Optional<org.apache.iceberg.expressions.Expression> icebergExpression = FlinkFilters.convert((Expression)resolvedExpression);
            if (!icebergExpression.isPresent()) continue;
            expressions.add(icebergExpression.get());
            acceptedFilters.add(resolvedExpression);
        }
        this.filters = expressions;
        return SupportsFilterPushDown.Result.of((List)acceptedFilters, flinkFilters);
    }

    public void applySourceWatermark() {
        Preconditions.checkArgument((boolean)((Boolean)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)), (Object)"Source watermarks are supported only in flip-27 iceberg source implementation");
        Preconditions.checkNotNull((Object)this.properties.get("watermark-column"), (Object)"watermark-column needs to be configured to use source watermark.");
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
        return new DataStreamScanProvider(){

            public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
                if (((Boolean)IcebergTableSource.this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)).booleanValue()) {
                    return IcebergTableSource.this.createFLIP27Stream(execEnv);
                }
                return IcebergTableSource.this.createDataStream(execEnv);
            }

            public boolean isBounded() {
                return FlinkSource.isBounded(IcebergTableSource.this.properties);
            }

            public Optional<Integer> getParallelism() {
                return Optional.ofNullable(PropertyUtil.propertyAsNullableInt(IcebergTableSource.this.properties, (String)FactoryUtil.SOURCE_PARALLELISM.key()));
            }
        };
    }

    public DynamicTableSource copy() {
        return new IcebergTableSource(this);
    }

    public String asSummaryString() {
        return "Iceberg table source";
    }
}

