/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.connector.file.src.util.SingletonResultIterator;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
import org.apache.paimon.flink.source.FlinkRecordsWithSplitIds;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FlinkRecordsWithSplitIdsTest {
    private RowData[] rows;
    private ArrayResultIterator<RowData> iter;
    private TestingReaderOutput<RowData> output;
    private FileStoreSourceSplitState state;

    @BeforeEach
    public void beforeEach() {
        this.iter = new ArrayResultIterator();
        this.rows = new RowData[]{GenericRowData.of((Object[])new Object[]{1, 1}), GenericRowData.of((Object[])new Object[]{2, 2})};
        this.iter.set((Object[])this.rows, this.rows.length, -1L, 0L);
        this.output = new TestingReaderOutput();
        this.state = new FileStoreSourceSplitState(new FileStoreSourceSplit("", null));
    }

    @Test
    public void testEmitRecord() {
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"", this.iter);
        records.nextSplit();
        BulkFormat.RecordIterator iterator = (BulkFormat.RecordIterator)records.nextRecordFromSplit();
        Assertions.assertThat((Object)iterator).isNotNull();
        FlinkRecordsWithSplitIds.emitRecord((BulkFormat.RecordIterator)iterator, this.output, (FileStoreSourceSplitState)this.state);
        Assertions.assertThat((List)this.output.getEmittedRecords()).containsExactly((Object[])this.rows);
        Assertions.assertThat((long)this.state.recordsToSkip()).isEqualTo(2L);
        Assertions.assertThat((Object)records.nextRecordFromSplit()).isNull();
    }

    @Test
    void testEmptySplits() {
        String split = "empty";
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.finishedSplit((String)"empty");
        Assertions.assertThat((Collection)records.finishedSplits()).isEqualTo(Collections.singleton("empty"));
    }

    @Test
    void testMoveToFirstSplit() {
        String splitId = "splitId";
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"splitId", (BulkFormat.RecordIterator)new SingletonResultIterator());
        String firstSplitId = records.nextSplit();
        Assertions.assertThat((String)"splitId").isEqualTo(firstSplitId);
    }

    @Test
    void testMoveToSecondSplit() {
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"splitId", (BulkFormat.RecordIterator)new SingletonResultIterator());
        records.nextSplit();
        String secondSplitId = records.nextSplit();
        Assertions.assertThat((String)secondSplitId).isNull();
    }

    @Test
    void testRecordsFromFirstSplit() {
        SingletonResultIterator iter = new SingletonResultIterator();
        iter.set((Object)GenericRowData.of((Object[])new Object[]{"test"}), 18L, 99L);
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"splitId", (BulkFormat.RecordIterator)iter);
        records.nextSplit();
        BulkFormat.RecordIterator recAndPos = records.nextRecordFromSplit();
        Assertions.assertThat((Object)recAndPos).isSameAs((Object)iter);
        Assertions.assertThat((Object)records.nextRecordFromSplit()).isNull();
    }

    @Test
    void testRecordsInitiallyIllegal() {
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"splitId", (BulkFormat.RecordIterator)new SingletonResultIterator());
        Assertions.assertThatThrownBy(() -> ((FlinkRecordsWithSplitIds)records).nextRecordFromSplit()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecordsOnSecondSplitIllegal() {
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"splitId", (BulkFormat.RecordIterator)new SingletonResultIterator());
        records.nextSplit();
        records.nextSplit();
        Assertions.assertThatThrownBy(() -> ((FlinkRecordsWithSplitIds)records).nextRecordFromSplit()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecycleExhaustedBatch() {
        AtomicBoolean recycled = new AtomicBoolean(false);
        SingletonResultIterator iter = new SingletonResultIterator(() -> recycled.set(true));
        iter.set((Object)GenericRowData.of((Object[])new Object[0]), 1L, 2L);
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"test split", (BulkFormat.RecordIterator)iter);
        records.nextSplit();
        records.nextRecordFromSplit();
        Assertions.assertThat((Object)records.nextRecordFromSplit()).isNull();
        Assertions.assertThat((String)records.nextSplit()).isNull();
        records.recycle();
        Assertions.assertThat((boolean)recycled.get()).isTrue();
    }

    @Test
    void testRecycleNonExhaustedBatch() {
        AtomicBoolean recycled = new AtomicBoolean(false);
        SingletonResultIterator iter = new SingletonResultIterator(() -> recycled.set(true));
        iter.set((Object)GenericRowData.of((Object[])new Object[0]), 1L, 2L);
        FlinkRecordsWithSplitIds records = FlinkRecordsWithSplitIds.forRecords((String)"test split", (BulkFormat.RecordIterator)iter);
        records.nextSplit();
        records.recycle();
        Assertions.assertThat((boolean)recycled.get()).isTrue();
    }
}

