/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.file.src.util.Pool;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.reader.ArrayBatchRecords;
import org.apache.iceberg.flink.source.reader.DataIteratorBatcher;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.reader.RecordFactory;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class ArrayPoolDataIteratorBatcher<T>
implements DataIteratorBatcher<T> {
    private final int batchSize;
    private final int handoverQueueSize;
    private final RecordFactory<T> recordFactory;
    private transient Pool<T[]> pool;

    ArrayPoolDataIteratorBatcher(ReadableConfig config, RecordFactory<T> recordFactory) {
        this.batchSize = (Integer)config.get(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT);
        this.handoverQueueSize = (Integer)config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
        this.recordFactory = recordFactory;
    }

    @Override
    public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> batch(String splitId, DataIterator<T> inputIterator) {
        Preconditions.checkArgument((inputIterator != null ? 1 : 0) != 0, (Object)"Input data iterator can't be null");
        if (this.pool == null) {
            this.pool = this.createPoolOfBatches(this.handoverQueueSize);
        }
        return new ArrayPoolBatchIterator(splitId, inputIterator, this.pool);
    }

    private Pool<T[]> createPoolOfBatches(int numBatches) {
        Pool poolOfBatches = new Pool(numBatches);
        for (int batchId = 0; batchId < numBatches; ++batchId) {
            T[] batch = this.recordFactory.createBatch(this.batchSize);
            poolOfBatches.add(batch);
        }
        return poolOfBatches;
    }

    private class ArrayPoolBatchIterator
    implements CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
        private final String splitId;
        private final DataIterator<T> inputIterator;
        private final Pool<T[]> pool;

        ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator, Pool<T[]> pool) {
            this.splitId = splitId;
            this.inputIterator = inputIterator;
            this.pool = pool;
        }

        public boolean hasNext() {
            return this.inputIterator.hasNext();
        }

        public RecordsWithSplitIds<RecordAndPosition<T>> next() {
            int recordCount;
            if (!this.inputIterator.hasNext()) {
                throw new NoSuchElementException();
            }
            T[] batch = this.getCachedEntry();
            for (recordCount = 0; this.inputIterator.hasNext() && recordCount < ArrayPoolDataIteratorBatcher.this.batchSize; ++recordCount) {
                Object nextRecord = this.inputIterator.next();
                ArrayPoolDataIteratorBatcher.this.recordFactory.clone(nextRecord, batch, recordCount);
                if (this.inputIterator.currentFileHasNext()) continue;
                break;
            }
            return ArrayBatchRecords.forRecords(this.splitId, (Pool.Recycler<T[]>)this.pool.recycler(), batch, recordCount, this.inputIterator.fileOffset(), this.inputIterator.recordOffset() - (long)recordCount);
        }

        public void close() throws IOException {
            this.inputIterator.close();
        }

        private T[] getCachedEntry() {
            try {
                return (Object[])this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for array pool entry", e);
            }
        }
    }
}

