/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.flink.source.reader.ArrayBatchRecords;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergSourceSplitReader<T>
implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class);
    private final IcebergSourceReaderMetrics metrics;
    private final ReaderFunction<T> openSplitFunction;
    private final SerializableComparator<IcebergSourceSplit> splitComparator;
    private final int indexOfSubtask;
    private final Queue<IcebergSourceSplit> splits;
    private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
    private IcebergSourceSplit currentSplit;
    private String currentSplitId;

    IcebergSourceSplitReader(IcebergSourceReaderMetrics metrics, ReaderFunction<T> openSplitFunction, SerializableComparator<IcebergSourceSplit> splitComparator, SourceReaderContext context) {
        this.metrics = metrics;
        this.openSplitFunction = openSplitFunction;
        this.splitComparator = splitComparator;
        this.indexOfSubtask = context.getIndexOfSubtask();
        this.splits = Queues.newArrayDeque();
    }

    public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
        this.metrics.incrementSplitReaderFetchCalls(1L);
        if (this.currentReader == null) {
            IcebergSourceSplit nextSplit = this.splits.poll();
            if (nextSplit != null) {
                this.currentSplit = nextSplit;
                this.currentSplitId = nextSplit.splitId();
                this.currentReader = (CloseableIterator)this.openSplitFunction.apply(this.currentSplit);
            } else {
                return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }
        }
        if (this.currentReader.hasNext()) {
            try {
                return (RecordsWithSplitIds)this.currentReader.next();
            }
            catch (UncheckedIOException e) {
                throw e.getCause();
            }
        }
        return this.finishSplit();
    }

    public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("Unsupported split change: %s", splitsChange.getClass()));
        }
        if (this.splitComparator != null) {
            ArrayList newSplits = Lists.newArrayList((Iterable)splitsChange.splits());
            newSplits.sort(this.splitComparator);
            LOG.info("Add {} splits to reader: {}", (Object)newSplits.size(), (Object)newSplits);
            this.splits.addAll(newSplits);
        } else {
            LOG.info("Add {} splits to reader", (Object)splitsChange.splits().size());
            this.splits.addAll(splitsChange.splits());
        }
        this.metrics.incrementAssignedSplits(splitsChange.splits().size());
        this.metrics.incrementAssignedBytes(this.calculateBytes(splitsChange));
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.currentSplitId = null;
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    public void pauseOrResumeSplits(Collection<IcebergSourceSplit> splitsToPause, Collection<IcebergSourceSplit> splitsToResume) {
    }

    private long calculateBytes(IcebergSourceSplit split) {
        return split.task().files().stream().map(ContentScanTask::length).reduce(0L, Long::sum);
    }

    private long calculateBytes(SplitsChange<IcebergSourceSplit> splitsChanges) {
        return splitsChanges.splits().stream().map(this::calculateBytes).reduce(0L, Long::sum);
    }

    private ArrayBatchRecords<T> finishSplit() throws IOException {
        if (this.currentReader != null) {
            this.currentReader.close();
            this.currentReader = null;
        }
        ArrayBatchRecords finishRecords = ArrayBatchRecords.finishedSplit(this.currentSplitId);
        LOG.info("Split reader {} finished split: {}", (Object)this.indexOfSubtask, (Object)this.currentSplitId);
        this.metrics.incrementFinishedSplits(1L);
        this.metrics.incrementFinishedBytes(this.calculateBytes(this.currentSplit));
        this.currentSplitId = null;
        return finishRecords;
    }
}

