/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.reader.ArrayBatchRecords;
import org.apache.iceberg.flink.source.reader.ArrayPoolDataIteratorBatcher;
import org.apache.iceberg.flink.source.reader.DataIteratorBatcher;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.reader.RecordFactory;
import org.apache.iceberg.flink.source.reader.RowDataRecordFactory;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppenderFactory;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class TestArrayPoolDataIteratorBatcherRowData {
    @TempDir
    protected Path temporaryFolder;
    private static final FileFormat FILE_FORMAT = FileFormat.PARQUET;
    private final Configuration config = new Configuration().set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, (Object)1).set(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, (Object)2);
    private final GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
    private final DataIteratorBatcher<RowData> batcher = new ArrayPoolDataIteratorBatcher((ReadableConfig)this.config, (RecordFactory)new RowDataRecordFactory(TestFixtures.ROW_TYPE));

    @Test
    public void testSingleFileLessThanOneFullBatch() throws Exception {
        List records = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)1, (long)1L);
        FileScanTask fileTask = ReaderUtil.createFileTask(records, File.createTempFile("junit", null, this.temporaryFolder.toFile()), FILE_FORMAT, (FileAppenderFactory<Record>)this.appenderFactory);
        BaseCombinedScanTask combinedTask = new BaseCombinedScanTask(new FileScanTask[]{fileTask});
        DataIterator<RowData> dataIterator = ReaderUtil.createDataIterator((CombinedScanTask)combinedTask);
        String splitId = "someSplitId";
        CloseableIterator recordBatchIterator = this.batcher.batch(splitId, dataIterator);
        ArrayBatchRecords batch = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Collection)batch.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch.records())).hasSize(2);
        Assertions.assertThat((int)batch.numberOfRecords()).isEqualTo(1);
        RecordAndPosition recordAndPosition = batch.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(1L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(0), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch.nextSplit()).isNull();
        batch.recycle();
        Assertions.assertThat((Iterator)recordBatchIterator).isExhausted();
    }

    @Test
    public void testSingleFileWithMultipleBatches() throws Exception {
        List records = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)5, (long)1L);
        FileScanTask fileTask = ReaderUtil.createFileTask(records, File.createTempFile("junit", null, this.temporaryFolder.toFile()), FILE_FORMAT, (FileAppenderFactory<Record>)this.appenderFactory);
        BaseCombinedScanTask combinedTask = new BaseCombinedScanTask(new FileScanTask[]{fileTask});
        DataIterator<RowData> dataIterator = ReaderUtil.createDataIterator((CombinedScanTask)combinedTask);
        String splitId = "someSplitId";
        CloseableIterator recordBatchIterator = this.batcher.batch(splitId, dataIterator);
        ArrayBatchRecords batch0 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Collection)batch0.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch0.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch0.records())).hasSize(2);
        Assertions.assertThat((int)batch0.numberOfRecords()).isEqualTo(2);
        RecordAndPosition recordAndPosition = batch0.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(1L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(0), (RowData)recordAndPosition.record());
        recordAndPosition = batch0.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(2L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(1), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch0.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch0.nextSplit()).isNull();
        batch0.recycle();
        ArrayBatchRecords batch1 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Object[])((RowData[])batch1.records())).containsExactlyInAnyOrder((Object[])((RowData[])batch0.records()));
        Assertions.assertThat((Collection)batch1.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch1.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch1.records())).hasSize(2);
        Assertions.assertThat((int)batch1.numberOfRecords()).isEqualTo(2);
        recordAndPosition = batch1.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(3L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(2), (RowData)recordAndPosition.record());
        recordAndPosition = batch1.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(4L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(3), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch1.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch1.nextSplit()).isNull();
        batch1.recycle();
        ArrayBatchRecords batch2 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Object[])((RowData[])batch2.records())).containsExactlyInAnyOrder((Object[])((RowData[])batch0.records()));
        Assertions.assertThat((Collection)batch2.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch2.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch2.records())).hasSize(2);
        Assertions.assertThat((int)batch2.numberOfRecords()).isEqualTo(1);
        recordAndPosition = batch2.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(0);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(5L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records.get(4), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch2.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch2.nextSplit()).isNull();
        batch2.recycle();
        Assertions.assertThat((Iterator)recordBatchIterator).isExhausted();
    }

    @Test
    public void testMultipleFilesWithSeekPosition() throws Exception {
        List records0 = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)1, (long)1L);
        FileScanTask fileTask0 = ReaderUtil.createFileTask(records0, File.createTempFile("junit", null, this.temporaryFolder.toFile()), FILE_FORMAT, (FileAppenderFactory<Record>)this.appenderFactory);
        List records1 = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)4, (long)2L);
        FileScanTask fileTask1 = ReaderUtil.createFileTask(records1, File.createTempFile("junit", null, this.temporaryFolder.toFile()), FILE_FORMAT, (FileAppenderFactory<Record>)this.appenderFactory);
        List records2 = RandomGenericData.generate((Schema)TestFixtures.SCHEMA, (int)3, (long)3L);
        FileScanTask fileTask2 = ReaderUtil.createFileTask(records2, File.createTempFile("junit", null, this.temporaryFolder.toFile()), FILE_FORMAT, (FileAppenderFactory<Record>)this.appenderFactory);
        BaseCombinedScanTask combinedTask = new BaseCombinedScanTask(Arrays.asList(fileTask0, fileTask1, fileTask2));
        DataIterator<RowData> dataIterator = ReaderUtil.createDataIterator((CombinedScanTask)combinedTask);
        dataIterator.seek(1, 1L);
        String splitId = "someSplitId";
        CloseableIterator recordBatchIterator = this.batcher.batch(splitId, dataIterator);
        ArrayBatchRecords batch10 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Collection)batch10.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch10.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch10.records())).hasSize(2);
        Assertions.assertThat((int)batch10.numberOfRecords()).isEqualTo(2);
        RecordAndPosition recordAndPosition = batch10.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(1);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("seek should skip the first record in file1. starting from the second record", new Object[0])).isEqualTo(2L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records1.get(1), (RowData)recordAndPosition.record());
        recordAndPosition = batch10.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(1);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(3L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records1.get(2), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch10.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch10.nextSplit()).isNull();
        batch10.recycle();
        ArrayBatchRecords batch11 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Object[])((RowData[])batch11.records())).containsExactlyInAnyOrder((Object[])((RowData[])batch10.records()));
        Assertions.assertThat((Collection)batch11.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch11.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch11.records())).hasSize(2);
        Assertions.assertThat((int)batch11.numberOfRecords()).isEqualTo(1);
        recordAndPosition = batch11.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(1);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(4L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records1.get(3), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch11.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch11.nextSplit()).isNull();
        batch11.recycle();
        ArrayBatchRecords batch20 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Object[])((RowData[])batch20.records())).containsExactlyInAnyOrder((Object[])((RowData[])batch10.records()));
        Assertions.assertThat((Collection)batch20.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch20.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch20.records())).hasSize(2);
        Assertions.assertThat((int)batch20.numberOfRecords()).isEqualTo(2);
        recordAndPosition = batch20.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(2);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(1L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records2.get(0), (RowData)recordAndPosition.record());
        recordAndPosition = batch20.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(2);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(2L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records2.get(1), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch20.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch20.nextSplit()).isNull();
        batch20.recycle();
        ArrayBatchRecords batch21 = (ArrayBatchRecords)recordBatchIterator.next();
        Assertions.assertThat((Object[])((RowData[])batch21.records())).containsExactlyInAnyOrder((Object[])((RowData[])batch10.records()));
        Assertions.assertThat((Collection)batch21.finishedSplits()).isEmpty();
        Assertions.assertThat((String)batch21.nextSplit()).isEqualTo(splitId);
        Assertions.assertThat((Object[])((RowData[])batch21.records())).hasSize(2);
        Assertions.assertThat((int)batch21.numberOfRecords()).isEqualTo(1);
        recordAndPosition = batch21.nextRecordFromSplit();
        Assertions.assertThat((int)recordAndPosition.fileOffset()).isEqualTo(2);
        ((AbstractLongAssert)Assertions.assertThat((long)recordAndPosition.recordOffset()).as("The position points to where the reader should resume after this record is processed.", new Object[0])).isEqualTo(3L);
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)records2.get(2), (RowData)recordAndPosition.record());
        Assertions.assertThat((Object)batch21.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)batch21.nextSplit()).isNull();
        batch21.recycle();
        Assertions.assertThat((Iterator)recordBatchIterator).isExhausted();
    }
}

