/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.flink.source.reader.ArrayBatchRecords;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderMetricsContext;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.metrics.MetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergSourceSplitReader<T>
implements SplitReader<RecordAndPosition<T>, IcebergSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergSourceSplitReader.class);
    private final ReaderFunction<T> openSplitFunction;
    private final int indexOfSubtask;
    private final Queue<IcebergSourceSplit> splits;
    private final MetricsContext.Counter<Long> assignedSplits;
    private final MetricsContext.Counter<Long> assignedBytes;
    private final MetricsContext.Counter<Long> finishedSplits;
    private final MetricsContext.Counter<Long> finishedBytes;
    private final MetricsContext.Counter<Long> splitReaderFetchCalls;
    private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
    private IcebergSourceSplit currentSplit;
    private String currentSplitId;

    IcebergSourceSplitReader(ReaderFunction<T> openSplitFunction, SourceReaderContext context, ReaderMetricsContext metrics) {
        this.openSplitFunction = openSplitFunction;
        this.indexOfSubtask = context.getIndexOfSubtask();
        this.splits = new ArrayDeque<IcebergSourceSplit>();
        this.assignedSplits = metrics.counter("assignedSplits", Long.class, MetricsContext.Unit.COUNT);
        this.assignedBytes = metrics.counter("assignedBytes", Long.class, MetricsContext.Unit.COUNT);
        this.finishedSplits = metrics.counter("finishedSplits", Long.class, MetricsContext.Unit.COUNT);
        this.finishedBytes = metrics.counter("finishedBytes", Long.class, MetricsContext.Unit.COUNT);
        this.splitReaderFetchCalls = metrics.counter("splitReaderFetchCalls", Long.class, MetricsContext.Unit.COUNT);
    }

    public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
        this.splitReaderFetchCalls.increment();
        if (this.currentReader == null) {
            IcebergSourceSplit nextSplit = this.splits.poll();
            if (nextSplit != null) {
                this.currentSplit = nextSplit;
                this.currentSplitId = nextSplit.splitId();
                this.currentReader = (CloseableIterator)this.openSplitFunction.apply(this.currentSplit);
            } else {
                return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }
        }
        if (this.currentReader.hasNext()) {
            try {
                return (RecordsWithSplitIds)this.currentReader.next();
            }
            catch (UncheckedIOException e) {
                throw e.getCause();
            }
        }
        return this.finishSplit();
    }

    public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("Unsupported split change: %s", splitsChange.getClass()));
        }
        LOG.info("Add {} splits to reader", (Object)splitsChange.splits().size());
        this.splits.addAll(splitsChange.splits());
        this.assignedSplits.increment(Long.valueOf(splitsChange.splits().size()));
        this.assignedBytes.increment(this.calculateBytes(splitsChange));
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.currentSplitId = null;
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    private long calculateBytes(IcebergSourceSplit split) {
        return split.task().files().stream().map(ContentScanTask::length).reduce(0L, Long::sum);
    }

    private long calculateBytes(SplitsChange<IcebergSourceSplit> splitsChanges) {
        return splitsChanges.splits().stream().map(this::calculateBytes).reduce(0L, Long::sum);
    }

    private ArrayBatchRecords<T> finishSplit() throws IOException {
        if (this.currentReader != null) {
            this.currentReader.close();
            this.currentReader = null;
        }
        ArrayBatchRecords finishRecords = ArrayBatchRecords.finishedSplit(this.currentSplitId);
        LOG.info("Split reader {} finished split: {}", (Object)this.indexOfSubtask, (Object)this.currentSplitId);
        this.finishedSplits.increment(1L);
        this.finishedBytes.increment(this.calculateBytes(this.currentSplit));
        this.currentSplitId = null;
        return finishRecords;
    }
}

