/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSplitPlanner;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.StreamingStartingStrategy;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IcebergSource<T>
implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);
    private final TableLoader tableLoader;
    private final ScanContext scanContext;
    private final ReaderFunction<T> readerFunction;
    private final SplitAssignerFactory assignerFactory;
    private transient Table table;

    IcebergSource(TableLoader tableLoader, ScanContext scanContext, ReaderFunction<T> readerFunction, SplitAssignerFactory assignerFactory, Table table) {
        this.tableLoader = tableLoader;
        this.scanContext = scanContext;
        this.readerFunction = readerFunction;
        this.assignerFactory = assignerFactory;
        this.table = table;
    }

    String name() {
        return "IcebergSource-" + this.lazyTable().name();
    }

    private String planningThreadName() {
        return this.lazyTable().name() + "-" + UUID.randomUUID();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
        ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, this.scanContext.planParallelism());
        try {
            List<IcebergSourceSplit> splits = FlinkSplitPlanner.planIcebergSourceSplits(this.lazyTable(), this.scanContext, workerPool);
            LOG.info("Discovered {} splits from table {} during job initialization", (Object)splits.size(), (Object)this.lazyTable().name());
            List<IcebergSourceSplit> list = splits;
            return list;
        }
        finally {
            workerPool.shutdown();
        }
    }

    private Table lazyTable() {
        if (this.table == null) {
            this.tableLoader.open();
            try (TableLoader loader = this.tableLoader;){
                this.table = loader.loadTable();
            }
            catch (IOException e) {
                throw new UncheckedIOException("Failed to close table loader", e);
            }
        }
        return this.table;
    }

    public Boundedness getBoundedness() {
        return this.scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
    }

    public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
        IcebergSourceReaderMetrics metrics = new IcebergSourceReaderMetrics((MetricGroup)readerContext.metricGroup(), this.lazyTable().name());
        return new IcebergSourceReader<T>(metrics, this.readerFunction, readerContext);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
        return this.createEnumerator(enumContext, null);
    }

    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext, IcebergEnumeratorState enumState) {
        return this.createEnumerator(enumContext, enumState);
    }

    public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
        return IcebergSourceSplitSerializer.INSTANCE;
    }

    public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
        return IcebergEnumeratorStateSerializer.INSTANCE;
    }

    private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(SplitEnumeratorContext<IcebergSourceSplit> enumContext, @Nullable IcebergEnumeratorState enumState) {
        SplitAssigner assigner;
        if (enumState == null) {
            assigner = this.assignerFactory.createAssigner();
        } else {
            LOG.info("Iceberg source restored {} splits from state for table {}", (Object)enumState.pendingSplits().size(), (Object)this.lazyTable().name());
            assigner = this.assignerFactory.createAssigner(enumState.pendingSplits());
        }
        if (this.scanContext.isStreaming()) {
            ContinuousSplitPlannerImpl splitPlanner = new ContinuousSplitPlannerImpl(this.lazyTable(), this.scanContext, this.planningThreadName());
            return new ContinuousIcebergEnumerator(enumContext, assigner, this.scanContext, splitPlanner, enumState);
        }
        List<IcebergSourceSplit> splits = this.planSplitsForBatch(this.planningThreadName());
        assigner.onDiscoveredSplits(splits);
        return new StaticIcebergEnumerator(enumContext, assigner, this.lazyTable(), this.scanContext, enumState);
    }

    public static <T> Builder<T> builder() {
        return new Builder();
    }

    public static Builder<RowData> forRowData() {
        return new Builder<RowData>();
    }

    public static class Builder<T> {
        private TableLoader tableLoader;
        private SplitAssignerFactory splitAssignerFactory;
        private ReaderFunction<T> readerFunction;
        private ReadableConfig flinkConfig = new Configuration();
        private final ScanContext.Builder contextBuilder = ScanContext.builder();
        private TableSchema projectedFlinkSchema;
        private Boolean exposeLocality;

        Builder() {
        }

        public Builder<T> tableLoader(TableLoader loader) {
            this.tableLoader = loader;
            return this;
        }

        public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
            this.splitAssignerFactory = assignerFactory;
            return this;
        }

        public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
            this.readerFunction = newReaderFunction;
            return this;
        }

        public Builder<T> flinkConfig(ReadableConfig config) {
            this.flinkConfig = config;
            return this;
        }

        public Builder<T> caseSensitive(boolean newCaseSensitive) {
            this.contextBuilder.caseSensitive(newCaseSensitive);
            return this;
        }

        public Builder<T> useSnapshotId(Long newSnapshotId) {
            this.contextBuilder.useSnapshotId(newSnapshotId);
            return this;
        }

        public Builder<T> streamingStartingStrategy(StreamingStartingStrategy newStartingStrategy) {
            this.contextBuilder.startingStrategy(newStartingStrategy);
            return this;
        }

        public Builder<T> startSnapshotTimestamp(Long newStartSnapshotTimestamp) {
            this.contextBuilder.startSnapshotTimestamp(newStartSnapshotTimestamp);
            return this;
        }

        public Builder<T> startSnapshotId(Long newStartSnapshotId) {
            this.contextBuilder.startSnapshotId(newStartSnapshotId);
            return this;
        }

        public Builder<T> endSnapshotId(Long newEndSnapshotId) {
            this.contextBuilder.endSnapshotId(newEndSnapshotId);
            return this;
        }

        public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
            this.contextBuilder.asOfTimestamp(newAsOfTimestamp);
            return this;
        }

        public Builder<T> splitSize(Long newSplitSize) {
            this.contextBuilder.splitSize(newSplitSize);
            return this;
        }

        public Builder<T> splitLookback(Integer newSplitLookback) {
            this.contextBuilder.splitLookback(newSplitLookback);
            return this;
        }

        public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
            this.contextBuilder.splitOpenFileCost(newSplitOpenFileCost);
            return this;
        }

        public Builder<T> streaming(boolean streaming) {
            this.contextBuilder.streaming(streaming);
            return this;
        }

        public Builder<T> monitorInterval(Duration newMonitorInterval) {
            this.contextBuilder.monitorInterval(newMonitorInterval);
            return this;
        }

        public Builder<T> nameMapping(String newNameMapping) {
            this.contextBuilder.nameMapping(newNameMapping);
            return this;
        }

        public Builder<T> project(Schema newProjectedSchema) {
            this.contextBuilder.project(newProjectedSchema);
            return this;
        }

        public Builder<T> project(TableSchema newProjectedFlinkSchema) {
            this.projectedFlinkSchema = newProjectedFlinkSchema;
            return this;
        }

        public Builder<T> filters(List<Expression> newFilters) {
            this.contextBuilder.filters(newFilters);
            return this;
        }

        public Builder<T> limit(long newLimit) {
            this.contextBuilder.limit(newLimit);
            return this;
        }

        public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
            this.contextBuilder.includeColumnStats(newIncludeColumnStats);
            return this;
        }

        public Builder<T> planParallelism(int planParallelism) {
            this.contextBuilder.planParallelism(planParallelism);
            return this;
        }

        public Builder<T> exposeLocality(boolean newExposeLocality) {
            this.exposeLocality = newExposeLocality;
            return this;
        }

        public Builder<T> properties(Map<String, String> properties) {
            this.contextBuilder.fromProperties(properties);
            return this;
        }

        public IcebergSource<T> build() {
            Table table;
            try (TableLoader loader = this.tableLoader;){
                loader.open();
                table = this.tableLoader.loadTable();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            Schema icebergSchema = table.schema();
            if (this.projectedFlinkSchema != null) {
                this.contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, this.projectedFlinkSchema));
            }
            ScanContext context = this.contextBuilder.build();
            if (this.readerFunction == null) {
                RowDataReaderFunction rowDataReaderFunction;
                this.readerFunction = rowDataReaderFunction = new RowDataReaderFunction(this.flinkConfig, table.schema(), context.project(), context.nameMapping(), context.caseSensitive(), table.io(), table.encryption());
            }
            this.checkRequired();
            return new IcebergSource<T>(this.tableLoader, context, this.readerFunction, this.splitAssignerFactory, table);
        }

        private void checkRequired() {
            Preconditions.checkNotNull((Object)this.tableLoader, (String)"tableLoader is required.");
            Preconditions.checkNotNull((Object)this.splitAssignerFactory, (String)"assignerFactory is required.");
            Preconditions.checkNotNull(this.readerFunction, (String)"readerFunction is required.");
        }
    }
}

