/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkInputFormat;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SourceUtil;
import org.apache.iceberg.flink.source.StreamingMonitorFunction;
import org.apache.iceberg.flink.source.StreamingReaderOperator;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSource {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSource.class);

    private FlinkSource() {
    }

    public static Builder forRowData() {
        return new Builder();
    }

    public static boolean isBounded(Map<String, String> properties) {
        return !PropertyUtil.propertyAsBoolean(properties, (String)"streaming", (boolean)false);
    }

    public static class Builder {
        private StreamExecutionEnvironment env;
        private Table table;
        private TableLoader tableLoader;
        private TableSchema projectedSchema;
        private ReadableConfig readableConfig = new Configuration();
        private final ScanContext.Builder contextBuilder = ScanContext.builder();
        private Boolean exposeLocality;
        private final Map<String, String> readOptions = Maps.newHashMap();

        public Builder tableLoader(TableLoader newLoader) {
            this.tableLoader = newLoader;
            return this;
        }

        public Builder table(Table newTable) {
            this.table = newTable;
            return this;
        }

        public Builder env(StreamExecutionEnvironment newEnv) {
            this.env = newEnv;
            return this;
        }

        public Builder filters(List<Expression> filters) {
            this.contextBuilder.filters(filters);
            return this;
        }

        public Builder project(TableSchema schema) {
            this.projectedSchema = schema;
            return this;
        }

        public Builder limit(Long newLimit) {
            if (newLimit != null) {
                this.readOptions.put("limit", Long.toString(newLimit));
            }
            return this;
        }

        public Builder set(String property, String value) {
            this.readOptions.put(property, value);
            return this;
        }

        public Builder setAll(Map<String, String> properties) {
            this.readOptions.putAll(properties);
            return this;
        }

        @Deprecated
        public Builder properties(Map<String, String> properties) {
            this.readOptions.putAll(properties);
            return this;
        }

        public Builder caseSensitive(boolean caseSensitive) {
            this.readOptions.put("case-sensitive", Boolean.toString(caseSensitive));
            return this;
        }

        public Builder snapshotId(Long snapshotId) {
            this.readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(snapshotId));
            return this;
        }

        public Builder branch(String branch) {
            this.readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
            return this;
        }

        public Builder tag(String tag) {
            this.readOptions.put(FlinkReadOptions.TAG.key(), tag);
            return this;
        }

        public Builder startSnapshotId(Long startSnapshotId) {
            this.readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(startSnapshotId));
            return this;
        }

        public Builder endSnapshotId(Long endSnapshotId) {
            this.readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(endSnapshotId));
            return this;
        }

        public Builder startTag(String startTag) {
            this.readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
            return this;
        }

        public Builder endTag(String endTag) {
            this.readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
            return this;
        }

        public Builder asOfTimestamp(Long asOfTimestamp) {
            this.readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(asOfTimestamp));
            return this;
        }

        public Builder splitSize(Long splitSize) {
            this.readOptions.put("split-size", Long.toString(splitSize));
            return this;
        }

        public Builder splitLookback(Integer splitLookback) {
            this.readOptions.put("split-lookback", Integer.toString(splitLookback));
            return this;
        }

        public Builder splitOpenFileCost(Long splitOpenFileCost) {
            this.readOptions.put("split-file-open-cost", Long.toString(splitOpenFileCost));
            return this;
        }

        public Builder streaming(boolean streaming) {
            this.readOptions.put("streaming", Boolean.toString(streaming));
            return this;
        }

        public Builder exposeLocality(boolean newExposeLocality) {
            this.exposeLocality = newExposeLocality;
            return this;
        }

        public Builder nameMapping(String nameMapping) {
            this.readOptions.put("schema.name-mapping.default", nameMapping);
            return this;
        }

        public Builder monitorInterval(Duration interval) {
            this.readOptions.put("monitor-interval", interval.toNanos() + " ns");
            return this;
        }

        public Builder maxPlanningSnapshotCount(int newMaxPlanningSnapshotCount) {
            this.readOptions.put("max-planning-snapshot-count", Integer.toString(newMaxPlanningSnapshotCount));
            return this;
        }

        public Builder flinkConf(ReadableConfig config) {
            this.readableConfig = config;
            return this;
        }

        public FlinkInputFormat buildFormat() {
            EncryptionManager encryption;
            FileIO io;
            Schema icebergSchema;
            block17: {
                Preconditions.checkNotNull((Object)this.tableLoader, (Object)"TableLoader should not be null");
                if (this.table == null) {
                    this.tableLoader.open();
                    try (TableLoader loader = this.tableLoader;){
                        this.table = loader.loadTable();
                        icebergSchema = this.table.schema();
                        io = this.table.io();
                        encryption = this.table.encryption();
                        break block17;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
                icebergSchema = this.table.schema();
                io = this.table.io();
                encryption = this.table.encryption();
            }
            if (this.projectedSchema == null) {
                this.contextBuilder.project(icebergSchema);
            } else {
                this.contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, this.projectedSchema));
            }
            this.contextBuilder.exposeLocality(SourceUtil.isLocalityEnabled(this.table, this.readableConfig, this.exposeLocality));
            this.contextBuilder.planParallelism((Integer)this.readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
            this.contextBuilder.resolveConfig(this.table, this.readOptions, this.readableConfig);
            return new FlinkInputFormat(this.tableLoader, icebergSchema, io, encryption, this.contextBuilder.build());
        }

        public DataStream<RowData> build() {
            Preconditions.checkNotNull((Object)this.env, (Object)"StreamExecutionEnvironment should not be null");
            FlinkInputFormat format = this.buildFormat();
            ScanContext context = this.contextBuilder.build();
            TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
            if (!context.isStreaming()) {
                int parallelism = SourceUtil.inferParallelism(this.readableConfig, context.limit(), () -> {
                    try {
                        return format.createInputSplits(0).length;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException("Failed to create iceberg input splits for table: " + this.table, e);
                    }
                });
                if (this.env.getMaxParallelism() > 0) {
                    parallelism = Math.min(parallelism, this.env.getMaxParallelism());
                }
                return this.env.createInput((InputFormat)format, typeInfo).setParallelism(parallelism);
            }
            StreamingMonitorFunction function = new StreamingMonitorFunction(this.tableLoader, context);
            String monitorFunctionName = String.format("Iceberg table (%s) monitor", this.table);
            String readerOperatorName = String.format("Iceberg table (%s) reader", this.table);
            return this.env.addSource((SourceFunction)function, monitorFunctionName).transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
        }
    }
}

