/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderUtil;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.TestingMetricGroup;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceReader {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA);

    @Test
    public void testReaderMetrics() throws Exception {
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        TestingMetricGroup metricGroup = new TestingMetricGroup();
        TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), (SourceReaderMetricGroup)metricGroup);
        IcebergSourceReader reader = this.createReader((MetricGroup)metricGroup, (SourceReaderContext)readerContext, null);
        reader.start();
        this.testOneSplitFetcher(reader, (TestingReaderOutput<RowData>)readerOutput, metricGroup, 1);
        this.testOneSplitFetcher(reader, (TestingReaderOutput<RowData>)readerOutput, metricGroup, 2);
    }

    @Test
    public void testReaderOrder() throws Exception {
        List<List<Record>> recordBatchList1 = ReaderUtil.createRecordBatchList(0L, TestFixtures.SCHEMA, 1, 1);
        CombinedScanTask task1 = ReaderUtil.createCombinedScanTask(recordBatchList1, TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory);
        List<List<Record>> recordBatchList2 = ReaderUtil.createRecordBatchList(1L, TestFixtures.SCHEMA, 1, 1);
        CombinedScanTask task2 = ReaderUtil.createCombinedScanTask(recordBatchList2, TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory);
        List<RowData> rowDataList1 = this.read(Arrays.asList(IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)task1), IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)task2)), 2L);
        List<RowData> rowDataList2 = this.read(Arrays.asList(IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)task2), IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)task1)), 2L);
        Assert.assertEquals((Object)rowDataList1.get(0), (Object)rowDataList2.get(0));
        Assert.assertEquals((Object)rowDataList1.get(1), (Object)rowDataList2.get(1));
    }

    private List<RowData> read(List<IcebergSourceSplit> splits, long expected) throws Exception {
        TestingMetricGroup metricGroup = new TestingMetricGroup();
        TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), (SourceReaderMetricGroup)metricGroup);
        IcebergSourceReader reader = this.createReader((MetricGroup)metricGroup, (SourceReaderContext)readerContext, new IdBasedComparator());
        reader.start();
        reader.addSplits(splits);
        TestingReaderOutput readerOutput = new TestingReaderOutput();
        while ((long)readerOutput.getEmittedRecords().size() < expected) {
            reader.pollNext((ReaderOutput)readerOutput);
        }
        reader.pollNext((ReaderOutput)readerOutput);
        Assert.assertEquals((long)expected, (long)readerOutput.getEmittedRecords().size());
        return readerOutput.getEmittedRecords();
    }

    private void testOneSplitFetcher(IcebergSourceReader reader, TestingReaderOutput<RowData> readerOutput, TestingMetricGroup metricGroup, int expectedCount) throws Exception {
        long seed = expectedCount;
        List<List<Record>> recordBatchList = ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1);
        CombinedScanTask task = ReaderUtil.createCombinedScanTask(recordBatchList, TEMPORARY_FOLDER, FileFormat.PARQUET, this.appenderFactory);
        IcebergSourceSplit split = IcebergSourceSplit.fromCombinedScanTask((CombinedScanTask)task);
        reader.addSplits(Collections.singletonList(split));
        while (readerOutput.getEmittedRecords().size() < expectedCount) {
            reader.pollNext(readerOutput);
        }
        Assert.assertEquals((long)expectedCount, (long)readerOutput.getEmittedRecords().size());
        TestHelpers.assertRowData(TestFixtures.SCHEMA, (StructLike)recordBatchList.get(0).get(0), (RowData)readerOutput.getEmittedRecords().get(expectedCount - 1));
        Assert.assertEquals((long)expectedCount, (long)metricGroup.counters().get("assignedSplits").getCount());
        reader.pollNext(readerOutput);
    }

    private IcebergSourceReader createReader(MetricGroup metricGroup, SourceReaderContext readerContext, SerializableComparator<IcebergSourceSplit> splitComparator) {
        IcebergSourceReaderMetrics readerMetrics = new IcebergSourceReaderMetrics(metricGroup, "db.tbl");
        RowDataReaderFunction readerFunction = new RowDataReaderFunction((ReadableConfig)new Configuration(), TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, (FileIO)new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), (EncryptionManager)new PlaintextEncryptionManager(), Collections.emptyList());
        return new IcebergSourceReader(SerializableRecordEmitter.defaultEmitter(), readerMetrics, (ReaderFunction)readerFunction, splitComparator, readerContext);
    }

    private static class IdBasedComparator
    implements SerializableComparator<IcebergSourceSplit> {
        private IdBasedComparator() {
        }

        public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) {
            return o1.splitId().compareTo(o2.splitId());
        }
    }
}

