/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.SourceUtil;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.ConverterReaderFunction;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataConverter;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergSource<T>
implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private final ReaderFunction<T> readerFunction;
    private final SplitAssignerFactory assignerFactory;
    private final SerializableComparator<IcebergSourceSplit> splitComparator;
    private final SerializableRecordEmitter<T> emitter;
    private final String tableName;
    private volatile List<IcebergSourceSplit> batchSplits;

    IcebergSource(TableLoader tableLoader, ScanContext scanContext, ReaderFunction<T> readerFunction, SplitAssignerFactory assignerFactory, SerializableComparator<IcebergSourceSplit> splitComparator, Table table, SerializableRecordEmitter<T> emitter) {
        Preconditions.checkNotNull((Object)tableLoader, (Object)"tableLoader is required.");
        Preconditions.checkNotNull(readerFunction, (Object)"readerFunction is required.");
        Preconditions.checkNotNull((Object)assignerFactory, (Object)"assignerFactory is required.");
        Preconditions.checkNotNull((Object)table, (Object)"table is required.");
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
        this.readerFunction = readerFunction;
        this.assignerFactory = assignerFactory;
        this.splitComparator = splitComparator;
        this.emitter = emitter;
        this.tableName = table.name();
    }

    String name() {
        return "IcebergSource-" + this.tableName;
    }

    private String planningThreadName() {
        return this.tableName + "-" + String.valueOf(UUID.randomUUID());
    }

    private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
        if (this.batchSplits != null) {
            return this.batchSplits;
        }
        ExecutorService workerPool = ThreadPools.newFixedThreadPool((String)threadName, (int)this.scanContext.planParallelism());
        try {
            List<IcebergSourceSplit> list;
            block12: {
                TableLoader loader = this.tableLoader.clone();
                try {
                    loader.open();
                    this.batchSplits = FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), this.scanContext, workerPool);
                    LOG.info("Discovered {} splits from table {} during job initialization", (Object)this.batchSplits.size(), (Object)this.tableName);
                    list = this.batchSplits;
                    if (loader == null) break block12;
                }
                catch (Throwable throwable) {
                    try {
                        if (loader != null) {
                            try {
                                loader.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException("Failed to close table loader", e);
                    }
                }
                loader.close();
            }
            return list;
        }
        finally {
            workerPool.shutdown();
        }
    }

    public Boundedness getBoundedness() {
        return this.scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
        IcebergSourceReaderMetrics metrics = new IcebergSourceReaderMetrics((MetricGroup)readerContext.metricGroup(), this.tableName);
        return new IcebergSourceReader<T>(this.emitter, metrics, this.readerFunction, this.splitComparator, readerContext);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
        return this.createEnumerator(enumContext, null);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
        return this.createEnumerator(enumContext, enumState);
    }

    public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
        return new IcebergSourceSplitSerializer(this.scanContext.caseSensitive());
    }

    public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new IcebergEnumeratorStateSerializer(this.scanContext.caseSensitive());
    }

    private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext, @Nullable IcebergEnumeratorState enumState) {
        SplitAssigner assigner;
        if (enumState == null) {
            assigner = this.assignerFactory.createAssigner();
        } else {
            LOG.info("Iceberg source restored {} splits from state for table {}", (Object)enumState.pendingSplits().size(), (Object)this.tableName);
            assigner = this.assignerFactory.createAssigner(enumState.pendingSplits());
        }
        if (this.scanContext.isStreaming()) {
            ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(this.tableLoader, this.scanContext, this.planningThreadName());
            return new ContinuousIcebergEnumerator(enumContext, assigner, this.scanContext, splitPlanner, enumState);
        }
        if (enumState == null) {
            List<IcebergSourceSplit> splits = this.planSplitsForBatch(this.planningThreadName());
            assigner.onDiscoveredSplits(splits);
            this.batchSplits = null;
        }
        return new StaticIcebergEnumerator(enumContext, assigner);
    }

    private boolean shouldInferParallelism() {
        return !this.scanContext.isStreaming();
    }

    private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
        int parallelism = SourceUtil.inferParallelism(flinkConf, this.scanContext.limit(), () -> {
            List<IcebergSourceSplit> splits = this.planSplitsForBatch(this.planningThreadName());
            return splits.size();
        });
        if (env.getMaxParallelism() > 0) {
            parallelism = Math.min(parallelism, env.getMaxParallelism());
        }
        return parallelism;
    }

    @Deprecated
    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static Builder<RowData> forRowData() {
        return new Builder<RowData>();
    }

    public static <T> Builder<T> forOutputType(RowDataConverter<T> converter) {
        return new Builder<T>().converter(converter);
    }

    public static class Builder<T> {
        private TableLoader tableLoader;
        private Table table;
        private SplitAssignerFactory splitAssignerFactory;
        private SerializableComparator<IcebergSourceSplit> splitComparator;
        private ReaderFunction<T> readerFunction;
        private RowDataConverter<T> converter;
        private ReadableConfig flinkConfig = new Configuration();
        private final ScanContext.Builder contextBuilder = ScanContext.builder();
        private TableSchema projectedTableSchema;
        private ResolvedSchema projectedFlinkSchema;
        private Boolean exposeLocality;
        private final Map<String, String> readOptions = Maps.newHashMap();

        Builder() {
        }

        public Builder<T> tableLoader(TableLoader loader) {
            this.tableLoader = loader;
            return this;
        }

        public Builder<T> table(Table newTable) {
            this.table = newTable;
            return this;
        }

        public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
            this.splitAssignerFactory = assignerFactory;
            return this;
        }

        public Builder<T> splitComparator(SerializableComparator<IcebergSourceSplit> newSplitComparator) {
            this.splitComparator = newSplitComparator;
            return this;
        }

        @Deprecated
        public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
            Preconditions.checkState((this.converter == null ? 1 : 0) != 0, (Object)"Cannot set reader function when builder was created via IcebergSource.forOutputType(Converter)");
            this.readerFunction = newReaderFunction;
            return this;
        }

        private Builder<T> converter(RowDataConverter<T> newConverter) {
            this.converter = newConverter;
            return this;
        }

        public Builder<T> flinkConfig(ReadableConfig config) {
            this.flinkConfig = config;
            return this;
        }

        public Builder<T> caseSensitive(boolean newCaseSensitive) {
            this.readOptions.put("case-sensitive", Boolean.toString(newCaseSensitive));
            return this;
        }

        public Builder<T> useSnapshotId(Long newSnapshotId) {
            if (newSnapshotId != null) {
                this.readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), Long.toString(newSnapshotId));
            }
            return this;
        }

        public Builder<T> streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
            this.readOptions.put("starting-strategy", newStartingStrategy.name());
            return this;
        }

        public Builder<T> startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
            if (newStartSnapshotTimestamp != null) {
                this.readOptions.put(FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(), Long.toString(newStartSnapshotTimestamp));
            }
            return this;
        }

        public Builder<T> startSnapshotId(Long newStartSnapshotId) {
            if (newStartSnapshotId != null) {
                this.readOptions.put(FlinkReadOptions.START_SNAPSHOT_ID.key(), Long.toString(newStartSnapshotId));
            }
            return this;
        }

        public Builder<T> tag(String tag) {
            this.readOptions.put(FlinkReadOptions.TAG.key(), tag);
            return this;
        }

        public Builder<T> branch(String branch) {
            this.readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
            return this;
        }

        public Builder<T> startTag(String startTag) {
            this.readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
            return this;
        }

        public Builder<T> endTag(String endTag) {
            this.readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
            return this;
        }

        public Builder<T> endSnapshotId(Long newEndSnapshotId) {
            if (newEndSnapshotId != null) {
                this.readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), Long.toString(newEndSnapshotId));
            }
            return this;
        }

        public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
            if (newAsOfTimestamp != null) {
                this.readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), Long.toString(newAsOfTimestamp));
            }
            return this;
        }

        public Builder<T> splitSize(Long newSplitSize) {
            if (newSplitSize != null) {
                this.readOptions.put("split-size", Long.toString(newSplitSize));
            }
            return this;
        }

        public Builder<T> splitLookback(Integer newSplitLookback) {
            if (newSplitLookback != null) {
                this.readOptions.put("split-lookback", Integer.toString(newSplitLookback));
            }
            return this;
        }

        public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
            if (newSplitOpenFileCost != null) {
                this.readOptions.put("split-file-open-cost", Long.toString(newSplitOpenFileCost));
            }
            return this;
        }

        public Builder<T> streaming(boolean streaming) {
            this.readOptions.put("streaming", Boolean.toString(streaming));
            return this;
        }

        public Builder<T> monitorInterval(Duration newMonitorInterval) {
            if (newMonitorInterval != null) {
                this.readOptions.put("monitor-interval", newMonitorInterval.toNanos() + " ns");
            }
            return this;
        }

        public Builder<T> nameMapping(String newNameMapping) {
            this.readOptions.put("schema.name-mapping.default", newNameMapping);
            return this;
        }

        public Builder<T> project(Schema newProjectedSchema) {
            this.contextBuilder.project(newProjectedSchema);
            return this;
        }

        @Deprecated
        public Builder<T> project(TableSchema newProjectedFlinkSchema) {
            this.projectedTableSchema = newProjectedFlinkSchema;
            return this;
        }

        public Builder<T> project(ResolvedSchema newProjectedFlinkSchema) {
            this.projectedFlinkSchema = newProjectedFlinkSchema;
            return this;
        }

        public Builder<T> filters(List<Expression> newFilters) {
            this.contextBuilder.filters(newFilters);
            return this;
        }

        public Builder<T> limit(Long newLimit) {
            if (newLimit != null) {
                this.readOptions.put("limit", Long.toString(newLimit));
            }
            return this;
        }

        public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
            this.readOptions.put("include-column-stats", Boolean.toString(newIncludeColumnStats));
            return this;
        }

        public Builder<T> planParallelism(int planParallelism) {
            this.readOptions.put(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(), Integer.toString(planParallelism));
            return this;
        }

        public Builder<T> exposeLocality(boolean newExposeLocality) {
            this.exposeLocality = newExposeLocality;
            return this;
        }

        public Builder<T> maxAllowedPlanningFailures(int maxAllowedPlanningFailures) {
            this.readOptions.put(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(), Integer.toString(maxAllowedPlanningFailures));
            return this;
        }

        public Builder<T> set(String property, String value) {
            this.readOptions.put(property, value);
            return this;
        }

        public Builder<T> setAll(Map<String, String> properties) {
            this.readOptions.putAll(properties);
            return this;
        }

        public Builder<T> watermarkColumn(String columnName) {
            Preconditions.checkArgument((this.splitAssignerFactory == null ? 1 : 0) != 0, (Object)"Watermark column and SplitAssigner should not be set in the same source");
            this.readOptions.put("watermark-column", columnName);
            return this;
        }

        public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
            this.readOptions.put("watermark-column-time-unit", timeUnit.name());
            return this;
        }

        @Deprecated
        public Builder<T> properties(Map<String, String> properties) {
            this.readOptions.putAll(properties);
            return this;
        }

        public IcebergSource<T> build() {
            if (this.table == null) {
                try (TableLoader loader = this.tableLoader;){
                    loader.open();
                    this.table = this.tableLoader.loadTable();
                }
                catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            this.contextBuilder.resolveConfig(this.table, this.readOptions, this.flinkConfig);
            this.contextBuilder.exposeLocality(SourceUtil.isLocalityEnabled(this.table, this.flinkConfig, this.exposeLocality));
            this.contextBuilder.planParallelism((Integer)this.flinkConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE));
            Schema icebergSchema = this.table.schema();
            if (this.projectedFlinkSchema != null) {
                this.contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, this.projectedFlinkSchema));
            } else if (this.projectedTableSchema != null) {
                this.contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, this.projectedTableSchema));
            }
            SerializableRecordEmitter emitter = SerializableRecordEmitter.defaultEmitter();
            FlinkReadConf flinkReadConf = new FlinkReadConf(this.table, this.readOptions, this.flinkConfig);
            String watermarkColumn = flinkReadConf.watermarkColumn();
            TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit();
            if (watermarkColumn != null) {
                this.contextBuilder.includeColumnStats(Sets.newHashSet((Object[])new String[]{watermarkColumn}));
                ColumnStatsWatermarkExtractor watermarkExtractor = new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
                emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
                this.splitAssignerFactory = new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
            }
            ScanContext context = this.contextBuilder.build();
            context.validate();
            if (this.readerFunction == null) {
                this.readerFunction = this.readerFunction(context);
            }
            if (this.splitAssignerFactory == null) {
                this.splitAssignerFactory = this.splitComparator == null ? new SimpleSplitAssignerFactory() : new OrderedSplitAssignerFactory(this.splitComparator);
            }
            return new IcebergSource<T>(this.tableLoader, context, this.readerFunction, this.splitAssignerFactory, this.splitComparator, this.table, emitter);
        }

        public DataStream<T> buildStream(StreamExecutionEnvironment env) {
            Preconditions.checkState((this.readerFunction == null ? 1 : 0) != 0, (Object)"Cannot set reader function when building a data stream from the source");
            IcebergSource<T> source = this.build();
            TypeInformation<T> outputTypeInfo = Builder.outputTypeInfo(this.converter, this.table.schema(), source.scanContext.project());
            DataStreamSource stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
            if (source.shouldInferParallelism()) {
                stream = stream.setParallelism(source.inferParallelism(this.flinkConfig, env));
            }
            return stream;
        }

        private static <T> TypeInformation<T> outputTypeInfo(RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
            if (converter != null) {
                return converter.getProducedType();
            }
            Schema readSchema = projected != null ? projected : tableSchema;
            return FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
        }

        private ReaderFunction<T> readerFunction(ScanContext context) {
            if (this.table instanceof BaseMetadataTable) {
                MetaDataReaderFunction rowDataReaderFunction = new MetaDataReaderFunction(this.flinkConfig, this.table.schema(), context.project(), this.table.io(), this.table.encryption());
                return rowDataReaderFunction;
            }
            if (this.converter == null) {
                return new RowDataReaderFunction(this.flinkConfig, this.table.schema(), context.project(), context.nameMapping(), context.caseSensitive(), this.table.io(), this.table.encryption(), context.filters(), context.limit());
            }
            return new ConverterReaderFunction<T>(this.converter, this.flinkConfig, this.table.schema(), context.project(), context.nameMapping(), context.caseSensitive(), this.table.io(), this.table.encryption(), context.filters(), context.limit());
        }
    }
}

