/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class ReaderFunctionTestBase<T> {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final FileFormat fileFormat;
    private final GenericAppenderFactory appenderFactory;

    @Parameterized.Parameters(name="fileFormat={0}")
    public static Object[][] parameters() {
        return new Object[][]{{FileFormat.AVRO}, {FileFormat.ORC}, {FileFormat.PARQUET}};
    }

    protected abstract ReaderFunction<T> readerFunction();

    protected abstract void assertRecords(List<Record> var1, List<T> var2, Schema var3);

    public ReaderFunctionTestBase(FileFormat fileFormat) {
        this.fileFormat = fileFormat;
        this.appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);
    }

    private void assertRecordsAndPosition(List<Record> expectedRecords, int expectedFileOffset, long startRecordOffset, RecordsWithSplitIds<RecordAndPosition<T>> batch) {
        RecordAndPosition recordAndPosition;
        batch.nextSplit();
        ArrayList actualRecords = Lists.newArrayList();
        long recordOffset = startRecordOffset;
        while ((recordAndPosition = (RecordAndPosition)batch.nextRecordFromSplit()) != null) {
            actualRecords.add(recordAndPosition.record());
            Assert.assertEquals((String)"expected file offset", (long)expectedFileOffset, (long)recordAndPosition.fileOffset());
            Assert.assertEquals((String)"expected record offset", (long)recordOffset, (long)(recordAndPosition.recordOffset() - 1L));
            ++recordOffset;
        }
        Assert.assertEquals((String)"expected record count", (long)expectedRecords.size(), (long)actualRecords.size());
        this.assertRecords(expectedRecords, actualRecords, TestFixtures.SCHEMA);
    }

    @Test
    public void testNoCheckpointedPosition() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch0 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(0), 0, 0L, batch0);
        batch0.recycle();
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1), 1, 0L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }

    @Test
    public void testCheckpointedPositionBeforeFirstFile() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask, (int)0, (long)0L);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch0 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(0), 0, 0L, batch0);
        batch0.recycle();
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1), 1, 0L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }

    @Test
    public void testCheckpointedPositionMiddleFirstFile() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask, (int)0, (long)1L);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch0 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(0).subList(1, 2), 0, 1L, batch0);
        batch0.recycle();
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1), 1, 0L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }

    @Test
    public void testCheckpointedPositionAfterFirstFile() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask, (int)0, (long)2L);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1), 1, 0L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }

    @Test
    public void testCheckpointedPositionBeforeSecondFile() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask, (int)1, (long)0L);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1), 1, 0L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }

    @Test
    public void testCheckpointedPositionMidSecondFile() throws IOException {
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2);
        CombinedScanTask combinedScanTask = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, this.fileFormat, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)combinedScanTask, (int)1, (long)1L);
        CloseableIterator reader = (CloseableIterator)this.readerFunction().apply((Object)split);
        RecordsWithSplitIds batch1 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(1).subList(1, 2), 1, 1L, batch1);
        batch1.recycle();
        RecordsWithSplitIds batch2 = (RecordsWithSplitIds)reader.next();
        this.assertRecordsAndPosition(recordBatchList.get(2), 2, 0L, batch2);
        batch2.recycle();
    }
}

