/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitReader;
import org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest;
import org.apache.paimon.flink.source.RecordLimiter;
import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.RecordWriter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class FileStoreSourceSplitReaderTest {
    @TempDir
    java.nio.file.Path tempDir;

    @BeforeEach
    public void beforeEach() throws Exception {
        SchemaManager schemaManager = new SchemaManager((FileIO)LocalFileIO.create(), new Path(this.tempDir.toUri()));
        schemaManager.createTable(new Schema(LogicalTypeConversion.toDataType((RowType)new RowType(Arrays.asList(new RowType.RowField("k", (LogicalType)new BigIntType()), new RowType.RowField("v", (LogicalType)new BigIntType()), new RowType.RowField("default", (LogicalType)new IntType())))).getFields(), Collections.singletonList("default"), Arrays.asList("k", "default"), Collections.emptyMap(), null));
    }

    @Test
    public void testPrimaryKey() throws Exception {
        this.innerTestOnce(0);
    }

    @Test
    public void testPrimaryKeySkip() throws Exception {
        this.innerTestOnce(4);
    }

    private FileStoreSourceSplitReader createReader(TableRead tableRead, @Nullable Long limit) {
        return new FileStoreSourceSplitReader(tableRead, limit == null ? null : new RecordLimiter(limit.longValue()), null);
    }

    private void innerTestOnce(int skip) throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files, skip));
        RecordsWithSplitIds records = reader.fetch();
        List expected = input.stream().map(t -> new Tuple2((Object)org.apache.flink.types.RowKind.INSERT, t.f1)).collect(Collectors.toList());
        List<Tuple2<org.apache.flink.types.RowKind, Long>> result = this.readRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", skip);
        Assertions.assertThat(result).isEqualTo(expected.subList(skip, expected.size()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        reader.close();
    }

    @Test
    public void testPrimaryKeyWithDelete() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input = this.kvs();
        RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(MergeTreeCompactManagerTest.row((int)1), 0);
        for (Tuple2<Long, Long> tuple2 : input) {
            writer.write((Object)new KeyValue().replace((InternalRow)GenericRow.of((Object[])new Object[]{tuple2.f0}), RowKind.INSERT, (InternalRow)GenericRow.of((Object[])new Object[]{tuple2.f1})));
        }
        writer.write((Object)new KeyValue().replace((InternalRow)GenericRow.of((Object[])new Object[]{222L}), RowKind.DELETE, (InternalRow)GenericRow.of((Object[])new Object[]{333L})));
        List files = writer.prepareCommit(true).newFilesIncrement().newFiles();
        writer.close();
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, (List<DataFileMeta>)files, true));
        RecordsWithSplitIds records = reader.fetch();
        List expected = input.stream().map(t -> new Tuple2((Object)org.apache.flink.types.RowKind.INSERT, t.f1)).collect(Collectors.toList());
        expected.add(new Tuple2((Object)org.apache.flink.types.RowKind.DELETE, (Object)333L));
        List<Tuple2<org.apache.flink.types.RowKind, Long>> result = this.readRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", 0L);
        Assertions.assertThat(result).isEqualTo(expected);
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        reader.close();
    }

    @Test
    public void testMultipleBatchInSplit() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input1 = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input1);
        List<Tuple2<Long, Long>> input2 = this.kvs(6L);
        List<DataFileMeta> files2 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input2);
        files.addAll(files2);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files));
        RecordsWithSplitIds records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 0L, input1.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 6L, input2.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        reader.close();
    }

    @Test
    public void testRestore() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files, 3L));
        RecordsWithSplitIds records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 3L, input.subList(3, input.size()).stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        reader.close();
    }

    @Test
    public void testRestoreMultipleBatchInSplit() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input1 = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input1);
        List<Tuple2<Long, Long>> input2 = this.kvs(6L);
        List<DataFileMeta> files2 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input2);
        files.addAll(files2);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files, 7L));
        RecordsWithSplitIds records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 7L, Stream.concat(input1.stream(), input2.stream()).skip(7L).map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        reader.close();
    }

    @Test
    public void testMultipleSplits() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input1 = this.kvs();
        List<DataFileMeta> files1 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input1);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files1));
        List<Tuple2<Long, Long>> input2 = this.kvs();
        List<DataFileMeta> files2 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)2), 1, input2);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id2", MergeTreeCompactManagerTest.row((int)2), 1, files2));
        RecordsWithSplitIds records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 0L, input1.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id2", 0L, input2.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id2", "id2", 0L, null);
        reader.close();
    }

    @Test
    public void testNoSplit() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        Assertions.assertThatThrownBy(() -> ((FileStoreSourceSplitReader)reader).fetch()).hasMessageContaining("no split remaining");
        reader.close();
    }

    @Test
    public void testLimit() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), 2L);
        List<Tuple2<Long, Long>> input = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input);
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files, 0L));
        RecordsWithSplitIds records = reader.fetch();
        List expected = input.stream().map(t -> new Tuple2((Object)org.apache.flink.types.RowKind.INSERT, t.f1)).collect(Collectors.toList());
        List<Tuple2<org.apache.flink.types.RowKind, Long>> result = this.readRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", 0L);
        Assertions.assertThat(result).isEqualTo(expected.subList(0, 2));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", null, 0L, Collections.emptyList());
        this.assignSplit(reader, FileStoreSourceSplitSerializerTest.newSourceSplit("id2", MergeTreeCompactManagerTest.row((int)1), 0, Collections.singletonList(FileStoreSourceSplitSerializerTest.newFile(0)), 0L));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id2", null, 0L, null);
        reader.close();
    }

    @Test
    public void testPauseOrResumeSplits() throws Exception {
        TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(this.tempDir.toString());
        FileStoreSourceSplitReader reader = this.createReader(rw.createReadWithKey(), null);
        List<Tuple2<Long, Long>> input1 = this.kvs();
        List<DataFileMeta> files = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input1);
        List<Tuple2<Long, Long>> input2 = this.kvs(6L);
        List<DataFileMeta> files2 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input2);
        files.addAll(files2);
        FileStoreSourceSplit split1 = FileStoreSourceSplitSerializerTest.newSourceSplit("id1", MergeTreeCompactManagerTest.row((int)1), 0, files);
        this.assignSplit(reader, split1);
        RecordsWithSplitIds records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 0L, input1.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        reader.pauseOrResumeSplits(Collections.singletonList(split1), Collections.emptyList());
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, null, 0L, Collections.emptyList());
        List<Tuple2<Long, Long>> input3 = this.kvs(12L);
        List<DataFileMeta> files3 = rw.writeFiles(MergeTreeCompactManagerTest.row((int)1), 0, input3);
        FileStoreSourceSplit split2 = FileStoreSourceSplitSerializerTest.newSourceSplit("id2", MergeTreeCompactManagerTest.row((int)1), 0, files3);
        this.assignSplit(reader, split2);
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, null, 0L, Collections.emptyList());
        reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singletonList(split1));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id1", 6L, input2.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id1", "id1", 0L, null);
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, null, "id2", 0L, input3.stream().map(t -> (Long)t.f1).collect(Collectors.toList()));
        records = reader.fetch();
        this.assertRecords((RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>)records, "id2", "id2", 0L, null);
        reader.close();
    }

    private void assertRecords(RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records, String finishedSplit, String nextSplit, long startRecordSkipCount, List<Long> expected) {
        if (finishedSplit != null) {
            Assertions.assertThat((Collection)records.finishedSplits()).isEqualTo(Collections.singleton(finishedSplit));
            return;
        }
        List<Tuple2<org.apache.flink.types.RowKind, Long>> result = this.readRecords(records, nextSplit, startRecordSkipCount);
        Assertions.assertThat(result.stream().map(t -> (Long)t.f1).collect(Collectors.toList())).isEqualTo(expected);
    }

    private List<Tuple2<org.apache.flink.types.RowKind, Long>> readRecords(RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records, String nextSplit, long startRecordSkipCount) {
        BulkFormat.RecordIterator iterator;
        Assertions.assertThat((Collection)records.finishedSplits()).isEmpty();
        Assertions.assertThat((String)records.nextSplit()).isEqualTo(nextSplit);
        ArrayList<Tuple2<org.apache.flink.types.RowKind, Long>> result = new ArrayList<Tuple2<org.apache.flink.types.RowKind, Long>>();
        while ((iterator = (BulkFormat.RecordIterator)records.nextRecordFromSplit()) != null) {
            RecordAndPosition record;
            while ((record = iterator.next()) != null) {
                result.add((Tuple2<org.apache.flink.types.RowKind, Long>)new Tuple2((Object)((RowData)record.getRecord()).getRowKind(), (Object)((RowData)record.getRecord()).getLong(0)));
                Assertions.assertThat((long)record.getRecordSkipCount()).isEqualTo(++startRecordSkipCount);
            }
        }
        records.recycle();
        return result;
    }

    private List<Tuple2<Long, Long>> kvs() {
        return this.kvs(0L);
    }

    private List<Tuple2<Long, Long>> kvs(long keyBase) {
        ArrayList<Tuple2<Long, Long>> kvs = new ArrayList<Tuple2<Long, Long>>();
        kvs.add(new Tuple2((Object)(keyBase + 1L), (Object)1L));
        kvs.add(new Tuple2((Object)(keyBase + 2L), (Object)2L));
        kvs.add(new Tuple2((Object)(keyBase + 3L), (Object)2L));
        kvs.add(new Tuple2((Object)(keyBase + 4L), (Object)-1L));
        kvs.add(new Tuple2((Object)(keyBase + 5L), (Object)1L));
        kvs.add(new Tuple2((Object)(keyBase + 6L), (Object)-2L));
        return kvs;
    }

    private void assignSplit(FileStoreSourceSplitReader reader, FileStoreSourceSplit split) {
        SplitsAddition splitsChange = new SplitsAddition(Collections.singletonList(split));
        reader.handleSplitsChanges((SplitsChange)splitsChange);
    }
}

