/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.FlinkInputSplit;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingMonitorFunction;
import org.apache.iceberg.flink.source.StreamingReaderOperator;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSource {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);

    private FlinkSource() {
    }

    public static Builder forRowData() {
        return new Builder();
    }

    public static boolean isBounded(Map<String, String> properties) {
        return !ScanContext.builder().fromProperties(properties).build().isStreaming();
    }

    public static class Builder {
        private static final Set<String> FILE_SYSTEM_SUPPORT_LOCALITY = ImmutableSet.of("hdfs");
        private StreamExecutionEnvironment env;
        private Table table;
        private TableLoader tableLoader;
        private TableSchema projectedSchema;
        private ReadableConfig readableConfig = new Configuration();
        private final ScanContext.Builder contextBuilder = ScanContext.builder();
        private Boolean exposeLocality;

        public Builder tableLoader(TableLoader newLoader) {
            this.tableLoader = newLoader;
            return this;
        }

        public Builder table(Table newTable) {
            this.table = newTable;
            return this;
        }

        public Builder env(StreamExecutionEnvironment newEnv) {
            this.env = newEnv;
            return this;
        }

        public Builder filters(List<Expression> filters) {
            this.contextBuilder.filters(filters);
            return this;
        }

        public Builder project(TableSchema schema) {
            this.projectedSchema = schema;
            return this;
        }

        public Builder limit(long newLimit) {
            this.contextBuilder.limit(newLimit);
            return this;
        }

        public Builder properties(Map<String, String> properties) {
            this.contextBuilder.fromProperties(properties);
            return this;
        }

        public Builder caseSensitive(boolean caseSensitive) {
            this.contextBuilder.caseSensitive(caseSensitive);
            return this;
        }

        public Builder snapshotId(Long snapshotId) {
            this.contextBuilder.useSnapshotId(snapshotId);
            return this;
        }

        public Builder startSnapshotId(Long startSnapshotId) {
            this.contextBuilder.startSnapshotId(startSnapshotId);
            return this;
        }

        public Builder endSnapshotId(Long endSnapshotId) {
            this.contextBuilder.endSnapshotId(endSnapshotId);
            return this;
        }

        public Builder asOfTimestamp(Long asOfTimestamp) {
            this.contextBuilder.asOfTimestamp(asOfTimestamp);
            return this;
        }

        public Builder splitSize(Long splitSize) {
            this.contextBuilder.splitSize(splitSize);
            return this;
        }

        public Builder splitLookback(Integer splitLookback) {
            this.contextBuilder.splitLookback(splitLookback);
            return this;
        }

        public Builder splitOpenFileCost(Long splitOpenFileCost) {
            this.contextBuilder.splitOpenFileCost(splitOpenFileCost);
            return this;
        }

        public Builder streaming(boolean streaming) {
            this.contextBuilder.streaming(streaming);
            return this;
        }

        public Builder exposeLocality(boolean newExposeLocality) {
            this.exposeLocality = newExposeLocality;
            return this;
        }

        public Builder nameMapping(String nameMapping) {
            this.contextBuilder.nameMapping(nameMapping);
            return this;
        }

        public Builder monitorInterval(Duration interval) {
            this.contextBuilder.monitorInterval(interval);
            return this;
        }

        public Builder flinkConf(ReadableConfig config) {
            this.readableConfig = config;
            return this;
        }

        public FlinkInputFormat buildFormat() {
            EncryptionManager encryption;
            FileIO io;
            Schema icebergSchema;
            block17: {
                Preconditions.checkNotNull(this.tableLoader, "TableLoader should not be null");
                if (this.table == null) {
                    this.tableLoader.open();
                    try (TableLoader loader = this.tableLoader;){
                        this.table = loader.loadTable();
                        icebergSchema = this.table.schema();
                        io = this.table.io();
                        encryption = this.table.encryption();
                        break block17;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
                icebergSchema = this.table.schema();
                io = this.table.io();
                encryption = this.table.encryption();
            }
            if (this.projectedSchema == null) {
                this.contextBuilder.project(icebergSchema);
            } else {
                this.contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, this.projectedSchema));
            }
            this.contextBuilder.exposeLocality(this.localityEnabled());
            return new FlinkInputFormat(this.tableLoader, icebergSchema, io, encryption, this.contextBuilder.build());
        }

        public DataStream<RowData> build() {
            Preconditions.checkNotNull(this.env, "StreamExecutionEnvironment should not be null");
            FlinkInputFormat format = this.buildFormat();
            ScanContext context = this.contextBuilder.build();
            TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
            if (!context.isStreaming()) {
                int parallelism = this.inferParallelism(format, context);
                if (this.env.getMaxParallelism() > 0) {
                    parallelism = Math.min(parallelism, this.env.getMaxParallelism());
                }
                return this.env.createInput((InputFormat)format, typeInfo).setParallelism(parallelism);
            }
            StreamingMonitorFunction function = new StreamingMonitorFunction(this.tableLoader, context);
            String monitorFunctionName = String.format("Iceberg table (%s) monitor", this.table);
            String readerOperatorName = String.format("Iceberg table (%s) reader", this.table);
            return this.env.addSource((SourceFunction)function, monitorFunctionName).transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
        }

        int inferParallelism(FlinkInputFormat format, ScanContext context) {
            int parallelism = (Integer)this.readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
            if (((Boolean)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)).booleanValue()) {
                int splitNum;
                int maxInferParallelism = (Integer)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
                Preconditions.checkState(maxInferParallelism >= 1, FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
                try {
                    FlinkInputSplit[] splits = format.createInputSplits(0);
                    splitNum = splits.length;
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Failed to create iceberg input splits for table: " + this.table, e);
                }
                parallelism = Math.min(splitNum, maxInferParallelism);
            }
            if (context.limit() > 0L) {
                int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)context.limit();
                parallelism = Math.min(parallelism, limit);
            }
            parallelism = Math.max(1, parallelism);
            return parallelism;
        }

        private boolean localityEnabled() {
            Boolean localityEnabled;
            Boolean bl = localityEnabled = this.exposeLocality != null ? this.exposeLocality : (Boolean)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO);
            if (localityEnabled != null && !localityEnabled.booleanValue()) {
                return false;
            }
            FileIO fileIO = this.table.io();
            if (fileIO instanceof HadoopFileIO) {
                HadoopFileIO hadoopFileIO = (HadoopFileIO)fileIO;
                try {
                    String scheme = new Path(this.table.location()).getFileSystem(hadoopFileIO.getConf()).getScheme();
                    return FILE_SYSTEM_SUPPORT_LOCALITY.contains(scheme);
                }
                catch (IOException e) {
                    LOG.warn("Failed to determine whether the locality information can be exposed for table: {}", (Object)this.table, (Object)e);
                }
            }
            return false;
        }
    }
}

