/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class DeletionVectorITCase
extends CatalogITCaseBase {
    @TempDir
    Path tempExternalPath;

    private static Stream<Arguments> parameters1() {
        return Stream.of(Arguments.of((Object[])new Object[]{"none", true}), Arguments.of((Object[])new Object[]{"none", false}), Arguments.of((Object[])new Object[]{"lookup", true}), Arguments.of((Object[])new Object[]{"lookup", false}));
    }

    private static Stream<Arguments> parameters2() {
        return Stream.of(Arguments.of((Object[])new Object[]{"input", true}), Arguments.of((Object[])new Object[]{"input", false}));
    }

    @ParameterizedTest
    @MethodSource(value={"parameters2"})
    public void testStreamingReadDVTableWhenChangelogProducerIsInput(String changelogProducer, boolean dvBitmap64) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */", new Object[0]);){
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(6)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4_1"})});
        }
        iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
        var4_4 = null;
        try {
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(6)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4_1"})});
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (iter != null) {
                if (var4_4 != null) {
                    try {
                        iter.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    iter.close();
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"parameters1"})
    public void testStreamingReadDVTable(String changelogProducer, boolean dvBitmap64) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */", new Object[0]);){
            if (changelogProducer.equals("none")) {
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4_1"})});
            } else {
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(12)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{3, "3"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4_1"})});
            }
        }
        iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);
        var4_4 = null;
        try {
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2_1"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4_1"})});
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (iter != null) {
                if (var4_4 != null) {
                    try {
                        iter.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    iter.close();
                }
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"parameters1"})
    public void testBatchReadDVTable(String changelogProducer, boolean dvBitmap64) {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2_2"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4_1"})});
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2"}), Row.of((Object[])new Object[]{3, "3"}), Row.of((Object[])new Object[]{4, "4"})});
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='4') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111"}), Row.of((Object[])new Object[]{2, "2_1"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4"})});
    }

    @ParameterizedTest
    @MethodSource(value={"parameters1"})
    public void testDVTableWithAggregationMergeEngine(String changelogProducer, boolean dvBitmap64) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s', 'merge-engine'='aggregation', 'fields.v.aggregate-function'='sum')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 1), (3, 1)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 1), (4, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 111111111}), Row.of((Object[])new Object[]{2, 4}), Row.of((Object[])new Object[]{3, 4}), Row.of((Object[])new Object[]{4, 5})});
        if (changelogProducer.equals("lookup")) {
            try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);){
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 111111111}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, 3}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, 4}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, 4}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, 3}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, 4}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, 4}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, 5})});
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"parameters1"})
    public void testDVTableWithPartialUpdateMergeEngine(String changelogProducer, boolean dvBitmap64) throws Exception {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 STRING, v2 STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s', 'merge-engine'='partial-update')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, '4', CAST(NULL AS STRING))", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "111111111", "1"}), Row.of((Object[])new Object[]{2, "2_1", "2"}), Row.of((Object[])new Object[]{3, "3_1", "3_1"}), Row.of((Object[])new Object[]{4, "4", "4"})});
        if (changelogProducer.equals("lookup")) {
            try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */", new Object[0]);){
                AssertionsForInterfaceTypes.assertThat((List)iter.collect(8)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "111111111", "1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "2", "2"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{3, "3_1", "3_1"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{4, null, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{2, "2", "2"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, "2_1", "2"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{4, null, "4"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{4, "4", "4"})});
            }
        }
    }

    @ParameterizedTest
    @MethodSource(value={"parameters1"})
    public void testBatchReadDVTableWithSequenceField(String changelogProducer, boolean dvBitmap64) {
        this.sql(String.format("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, sequence INT, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'sequence.field' = 'sequence', 'changelog-producer' = '%s', 'deletion-vectors.bitmap64' = '%s')", changelogProducer, dvBitmap64), new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 1, '1'), (2, 1, '2')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2, '1_1'), (2, 2, '2_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 3, '1_2'), (2, 1, '2_2')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3, "1_2"}), Row.of((Object[])new Object[]{2, 2, "2_1"})});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testReadTagWithDv(boolean dvBitmap64) {
        this.sql("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'deletion-vectors.bitmap64' = '" + dvBitmap64 + "', 'snapshot.num-retained.min' = '1', 'snapshot.num-retained.max' = '1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '1'), (2, '2')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'my_tag')", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, '3'), (4, '4')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='my_tag') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "1"}), Row.of((Object[])new Object[]{2, "2"})});
    }

    @Test
    public void testChangeToDv64() throws Exception {
        this.sql("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = 'lookup', 'deletion-vectors.bitmap64' = 'false', 'bucket' = '1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (5, '5'), (6, '6'), (7, '8')", new Object[0]);
        this.sql("ALTER TABLE T SET('deletion-vectors.bitmap64' = 'true')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, '2_2'),(6, '6_1'), (7, '7_1')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "1"}), Row.of((Object[])new Object[]{2, "2_2"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4"}), Row.of((Object[])new Object[]{5, "5"}), Row.of((Object[])new Object[]{6, "6_1"}), Row.of((Object[])new Object[]{7, "7_1"})});
        AssertionsForInterfaceTypes.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='4') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "1"}), Row.of((Object[])new Object[]{2, "2_1"}), Row.of((Object[])new Object[]{3, "3_1"}), Row.of((Object[])new Object[]{4, "4"})});
    }

    @Test
    public void testRemoveDvsAfterFullCompaction() throws Exception {
        this.sql("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = 'lookup', 'bucket' = '1')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("DELETE FROM T WHERE id=1", new Object[0]);
        AssertionsForInterfaceTypes.assertThat((int)this.sql("SELECT * FROM T", new Object[0]).size()).isEqualTo(3);
        this.tEnv.getConfig().set("table.dml-sync", "true");
        this.sql("CALL sys.compact(`table` => 'default.T')", new Object[0]);
        this.sql("ALTER TABLE T SET('deletion-vectors.enabled' = 'false')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat((int)this.sql("SELECT * FROM T", new Object[0]).size()).isEqualTo(3);
        this.sql("CREATE TABLE TT (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = 'lookup', 'target-file-size' = '1000 B', 'bucket' = '1')", new Object[0]);
        this.sql("INSERT INTO TT VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4')", new Object[0]);
        this.sql("INSERT INTO TT VALUES (5, '5'), (6, '6'), (7, '7')", new Object[0]);
        this.sql("CALL sys.compact(`table` => 'default.TT')", new Object[0]);
        this.sql("DELETE FROM TT WHERE id = 1", new Object[0]);
        this.sql("DELETE FROM TT WHERE id = 7", new Object[0]);
        AssertionsForInterfaceTypes.assertThat((int)this.sql("SELECT * FROM TT", new Object[0]).size()).isEqualTo(5);
        this.sql("CALL sys.compact(`table` => 'default.TT')", new Object[0]);
        this.sql("ALTER TABLE TT SET('deletion-vectors.enabled' = 'false')", new Object[0]);
        AssertionsForInterfaceTypes.assertThat((int)this.sql("SELECT * FROM TT", new Object[0]).size()).isEqualTo(5);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testStreamingReadFullWithoutCompact(boolean isPk) throws Exception {
        if (isPk) {
            this.sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = 'none', 'write-only' = 'true')", new Object[0]);
        } else {
            this.sql("CREATE TABLE T (a INT, b INT) WITH ('deletion-vectors.enabled' = 'true', 'write-only' = 'true')", new Object[0]);
        }
        this.sql("INSERT INTO T VALUES (1, 1)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 3)", new Object[0]);
        try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM T /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '2') */", new Object[0]);){
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1}), Row.of((Object[])new Object[]{2, 2}), Row.of((Object[])new Object[]{3, 3})});
        }
    }

    @Test
    public void testIndexFileInDataFileDir() throws IOException {
        this.sql("CREATE TABLE IT (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('deletion-vectors.enabled' = 'true', 'index-file-in-data-file-dir' = 'true')", new Object[0]);
        this.sql("INSERT INTO IT VALUES (1, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.sql("SELECT * FROM IT", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1})});
        org.apache.paimon.fs.Path path = this.getTableDirectory("IT");
        LocalFileIO fileIO = LocalFileIO.create();
        String result = Arrays.asList(fileIO.listFiles(path, true)).toString();
        AssertionsForInterfaceTypes.assertThat((String)result).contains(new CharSequence[]{"default.db/IT/bucket-0/index-"});
        AssertionsForInterfaceTypes.assertThat((String)result).doesNotContain(new CharSequence[]{"default.db/IT/index/index-"});
    }

    @Test
    public void testIndexFileInIndexDir() throws IOException {
        this.sql("CREATE TABLE IT (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('deletion-vectors.enabled' = 'true')", new Object[0]);
        this.sql("INSERT INTO IT (a, b) VALUES (1, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.sql("SELECT * FROM IT", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1})});
        org.apache.paimon.fs.Path path = this.getTableDirectory("IT");
        LocalFileIO fileIO = LocalFileIO.create();
        String result = Arrays.asList(fileIO.listFiles(path, true)).toString();
        AssertionsForInterfaceTypes.assertThat((String)result).doesNotContain(new CharSequence[]{"default.db/IT/bucket-0/index-"});
        AssertionsForInterfaceTypes.assertThat((String)result).contains(new CharSequence[]{"default.db/IT/index/index-"});
    }

    @Test
    public void testIndexFileInDataFileDirWithExternalPath() throws IOException {
        String externalPaths = "traceable://" + this.tempExternalPath.toString();
        this.sql("CREATE TABLE IT (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH ('deletion-vectors.enabled' = 'true', 'index-file-in-data-file-dir' = 'true', 'data-file.external-paths.strategy' = 'round-robin', " + String.format("'data-file.external-paths' = '%s')", externalPaths), new Object[0]);
        this.sql("INSERT INTO IT (a, b) VALUES (1, 1)", new Object[0]);
        AssertionsForInterfaceTypes.assertThat(this.sql("SELECT * FROM IT", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1})});
        LocalFileIO fileIO = LocalFileIO.create();
        org.apache.paimon.fs.Path path = this.getTableDirectory("IT");
        String inTablePath = Arrays.asList(fileIO.listFiles(path, true)).toString();
        AssertionsForInterfaceTypes.assertThat((String)inTablePath).doesNotContain(new CharSequence[]{"bucket-0/index-"});
        AssertionsForInterfaceTypes.assertThat((String)inTablePath).doesNotContain(new CharSequence[]{"index/index-"});
        org.apache.paimon.fs.Path externalPath = new org.apache.paimon.fs.Path(externalPaths);
        String inExternalPath = Arrays.asList(fileIO.listFiles(externalPath, true)).toString();
        AssertionsForInterfaceTypes.assertThat((String)inExternalPath).contains(new CharSequence[]{"bucket-0/index-"});
        AssertionsForInterfaceTypes.assertThat((String)inExternalPath).doesNotContain(new CharSequence[]{"index/index-"});
    }

    @Test
    public void testLookupMergeBufferSize() {
        this.sql("CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) WITH ('deletion-vectors.enabled' = 'true', 'lookup.merge-records-threshold' = '2')", new Object[0]);
        for (int i = 0; i < 5; ++i) {
            this.sql(String.format("INSERT INTO T /*+ OPTIONS('write-only' = '%s') */ VALUES (1, '%s')", i != 4, i), new Object[0]);
        }
        AssertionsForInterfaceTypes.assertThat(this.sql("SELECT * FROM T", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, String.valueOf(4)})});
    }
}

