/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class BranchSqlITCase
extends CatalogITCaseBase {
    @Test
    public void testDefaultValue() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T', 'b', '5')", new Object[0]);
        this.sql("INSERT INTO T (a) VALUES (1), (2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 5]", "+I[2, 5]"});
    }

    @Test
    public void testUnsupportedDefaultValue() {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sql("CALL sys.alter_column_default_value('default.T', 'b', 'ddd')", new Object[0])).hasMessageContaining("Unsupported default value `ddd` for type INT");
    }

    @Test
    public void testArrayDefaultValue() throws Exception {
        this.sql("CREATE TABLE T_ARRAY (id INT, tags ARRAY<STRING>, numbers ARRAY<INT>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'tags', '[tag1, tag2]')", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_ARRAY', 'numbers', '[1, 2, 3]')", new Object[0]);
        this.sql("INSERT INTO T_ARRAY (id) VALUES (1), (2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_ARRAY")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, [tag1, tag2], [1, 2, 3]]", "+I[2, [tag1, tag2], [1, 2, 3]]"});
        this.sql("CREATE TABLE T_EMPTY_ARRAY (id INT, empty_tags ARRAY<STRING>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_EMPTY_ARRAY', 'empty_tags', '[]')", new Object[0]);
        this.sql("INSERT INTO T_EMPTY_ARRAY (id) VALUES (1)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_EMPTY_ARRAY")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, []]"});
    }

    @Test
    public void testMapDefaultValue() throws Exception {
        this.sql("CREATE TABLE T_MAP (id INT, properties MAP<STRING, STRING>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_MAP', 'properties', '{key1 -> value1, key2 -> value2}')", new Object[0]);
        this.sql("INSERT INTO T_MAP (id) VALUES (1), (2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_MAP")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, {key1=value1, key2=value2}]", "+I[2, {key1=value1, key2=value2}]"});
        this.sql("CREATE TABLE T_EMPTY_MAP (id INT, empty_map MAP<STRING, INT>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_EMPTY_MAP', 'empty_map', '{}')", new Object[0]);
        this.sql("INSERT INTO T_EMPTY_MAP (id) VALUES (1)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_EMPTY_MAP")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, {}]"});
    }

    @Test
    public void testRowDefaultValue() throws Exception {
        this.sql("CREATE TABLE T_STRUCT (id INT, nested ROW<x INT, y STRING>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_STRUCT', 'nested', '{42, default_value}')", new Object[0]);
        this.sql("INSERT INTO T_STRUCT (id) VALUES (1), (2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_STRUCT")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, +I[42, default_value]]", "+I[2, +I[42, default_value]]"});
        this.sql("CREATE TABLE T_COMPLEX_STRUCT (id INT, config ROW<enabled BOOLEAN, timeout INT, name STRING>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_COMPLEX_STRUCT', 'config', '{true, 30, config_name}')", new Object[0]);
        this.sql("INSERT INTO T_COMPLEX_STRUCT (id) VALUES (1)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_COMPLEX_STRUCT")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, +I[true, 30, config_name]]"});
    }

    @Test
    public void testMixedComplexDefaultValue() throws Exception {
        this.sql("CREATE TABLE T_MIXED (id INT, name STRING, tags ARRAY<STRING>, metadata MAP<STRING, STRING>, config ROW<enabled BOOLEAN, timeout INT>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_MIXED', 'name', 'default_name')", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_MIXED', 'tags', '[default_tag]')", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_MIXED', 'metadata', '{created_by -> system}')", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_MIXED', 'config', '{true, 30}')", new Object[0]);
        this.sql("INSERT INTO T_MIXED (id) VALUES (1)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_MIXED")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, default_name, [default_tag], {created_by=system}, +I[true, 30]]"});
        this.sql("INSERT INTO T_MIXED (id, name) VALUES (2, 'custom_name')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_MIXED WHERE id = 2")).containsExactlyInAnyOrder((Object[])new String[]{"+I[2, custom_name, [default_tag], {created_by=system}, +I[true, 30]]"});
    }

    @Test
    public void testNestedComplexDefaultValue() throws Exception {
        this.sql("CREATE TABLE T_NESTED (id INT, nested_array ARRAY<ROW<name STRING, val INT>>, nested_struct ROW<info ROW<x INT, y STRING>, enabled BOOLEAN>)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_NESTED', 'nested_array', '[{item1, 10}, {item2, 20}]')", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T_NESTED', 'nested_struct', '{{42, nested_value}, true}')", new Object[0]);
        this.sql("INSERT INTO T_NESTED (id) VALUES (1)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_NESTED")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, [+I[item1, 10], +I[item2, 20]], +I[+I[42, nested_value], true]]"});
        this.sql("INSERT INTO T_NESTED (id, nested_array) VALUES (2, ARRAY[ROW('custom', 99)])", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T_NESTED WHERE id = 2")).containsExactlyInAnyOrder((Object[])new String[]{"+I[2, [+I[custom, 99]], +I[+I[42, nested_value], true]]"});
    }

    @Test
    public void testDefaultValueForPkTableDynamicBucket() throws Exception {
        this.sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT)", new Object[0]);
        this.sql("CALL sys.alter_column_default_value('default.T', 'b', '5')", new Object[0]);
        this.sql("INSERT INTO T (a) VALUES (1), (2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 5]", "+I[2, 5]"});
    }

    @Test
    public void testAlterBranchTable() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 10, 'apple'), (1, 20, 'banana'), (2, 10, 'cat'), (2, 20, 'dog')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test', 'tag1')", new Object[0]);
        FileStoreTable branchTable = this.paimonTable("T$branch_test");
        Assertions.assertThat((int)branchTable.schema().fields().size()).isEqualTo(3);
        this.sql("INSERT INTO T VALUES (1, 10, 'APPLE'), (2, 20, 'DOG'), (2, 30, 'horse')", new Object[0]);
        this.sql("ALTER TABLE `T$branch_test` ADD (v2 INT)", new Object[0]);
        branchTable = this.paimonTable("T$branch_test");
        Assertions.assertThat((int)branchTable.schema().fields().size()).isEqualTo(4);
        this.sql("INSERT INTO `T$branch_test` VALUES (1, 10, 'cherry', 100), (2, 20, 'bird', 200), (2, 40, 'wolf', 400)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, APPLE]", "+I[1, 20, banana]", "+I[2, 30, horse]", "+I[2, 10, cat]", "+I[2, 20, DOG]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_test")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, cherry, 100]", "+I[1, 20, banana, null]", "+I[2, 10, cat, null]", "+I[2, 20, bird, 200]", "+I[2, 40, wolf, 400]"});
    }

    @Test
    public void testCreateBranchFromTag() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 10, 'apple'), (1, 20, 'banana')", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 10, 'cat'), (2, 20, 'dog')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag2', 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test', 'tag1')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test2', 'tag2')", new Object[0]);
        FileStoreTable branchTable = this.paimonTable("T$branch_test");
        Assertions.assertThat((boolean)branchTable.tagManager().tagExists("tag1")).isEqualTo(true);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_test")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple]", "+I[1, 20, banana]"});
        FileStoreTable branchTable2 = this.paimonTable("T$branch_test2");
        Assertions.assertThat((boolean)branchTable2.tagManager().tagExists("tag2")).isEqualTo(true);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_test2")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple]", "+I[1, 20, banana]", "+I[2, 10, cat]", "+I[2, 20, dog]"});
    }

    @Test
    public void testCreateBranchFromAnotherBranch() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 10, 'apple'), (1, 20, 'banana')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'branch_A', 'tag1')", new Object[0]);
        FileStoreTable branchTable = this.paimonTable("T$branch_branch_A");
        Assertions.assertThat((boolean)branchTable.tagManager().tagExists("tag1")).isEqualTo(true);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_branch_A")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple]", "+I[1, 20, banana]"});
        this.sql("CALL sys.create_branch('default.T$branch_branch_A', 'branch_B', 'tag1')", new Object[0]);
        FileStoreTable branchTableB = this.paimonTable("T$branch_branch_B");
        Assertions.assertThat((boolean)branchTableB.tagManager().tagExists("tag1")).isEqualTo(true);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_branch_B")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple]", "+I[1, 20, banana]"});
    }

    @Test
    public void testCreateEmptyBranch() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES(1, 10, 'apple')", new Object[0]);
        this.sql("INSERT INTO T VALUES(1, 20, 'dog')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple]", "+I[1, 20, dog]"});
        this.sql("CALL sys.create_branch('default.T', 'empty_branch')", new Object[0]);
        this.sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_empty_branch")).containsExactlyInAnyOrder((Object[])new String[]{"+I[3, 30, banana]"});
    }

    @Test
    public void testDeleteBranchTable() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES(1, 10, 'apple')", new Object[0]);
        this.sql("INSERT INTO T VALUES(1, 20, 'dog')", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag2', 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test', 'tag1')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test2', 'tag2')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[test]", "+I[test2]"});
        this.sql("CALL sys.delete_branch('default.T', 'test')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[test2]"});
    }

    @Test
    public void testBranchFastForward() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        SnapshotManager snapshotManager = table.snapshotManager();
        this.sql("INSERT INTO T VALUES (1, 10, 'hunter')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 20, 'hunter')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 30, 'hunter')", new Object[0]);
        this.checkSnapshots(snapshotManager, 1, 3);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]"});
        this.sql("CALL sys.create_tag('default.T', 'tag1', 1)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test', 'tag1')", new Object[0]);
        this.sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')", new Object[0]);
        this.checkSnapshots(this.paimonTable("T$branch_test").snapshotManager(), 1, 2);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_test")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, hunter]", "+I[2, 10, hunterX]"});
        this.sql("CALL sys.fast_forward('default.T', 'test')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM T")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, hunter]", "+I[2, 10, hunterX]"});
        this.checkSnapshots(snapshotManager, 1, 2);
        this.sql("alter table T set ('branch'='test')", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sql("CALL sys.fast_forward('default.T', 'test')", new Object[0])).hasMessageContaining("Fast-forward from the current branch 'test' is not allowed.");
    }

    @Test
    public void testFallbackBranchBatchRead() throws Exception {
        this.sql("CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'pk')", new Object[0]);
        this.sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )", new Object[0]);
        this.sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )", new Object[0]);
        this.sql("INSERT INTO `t$branch_pk` VALUES (1, 20, 'cat'), (1, 30, 'dog')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]", "+I[banana, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]"});
        this.sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20, 'wolf')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]", "+I[wolf, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]", "+I[tiger, 10]", "+I[wolf, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t WHERE pt = 1")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]", "+I[banana, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 1")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t WHERE pt = 2")).containsExactlyInAnyOrder((Object[])new String[]{"+I[tiger, 10]", "+I[wolf, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 2")).containsExactlyInAnyOrder((Object[])new String[]{"+I[tiger, 10]", "+I[wolf, 20]"});
        this.sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", "+I[wolf, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"});
        this.sql("INSERT OVERWRITE t PARTITION (pt = 1) VALUES (10, 'pear'), (20, 'mango')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[pear, 10]", "+I[mango, 20]", "+I[lion, 10]", "+I[wolf, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"});
        this.sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[pear, 10]", "+I[mango, 20]"});
        Assertions.assertThat(this.collectResult("SELECT v, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", "+I[wolf, 20]"});
    }

    @Test
    public void testFallbackBranchStreamRead() throws Exception {
        this.sql("CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'pk')", new Object[0]);
        this.sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )", new Object[0]);
        this.sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )", new Object[0]);
        this.sql("INSERT INTO `t$branch_pk` VALUES (2, 20, 'cat'), (2, 30, 'dog')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]", "+I[banana, 20]", "+I[cat, 20]", "+I[dog, 30]"});
        try (BlockingIterator<Row, Row> iter = this.streamSqlBlockIter("SELECT * FROM t", new Object[0]);){
            AssertionsForInterfaceTypes.assertThat((List)iter.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 10, "apple"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 20, "banana"})});
        }
    }

    @Test
    public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
        this.sql("CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'stream')", new Object[0]);
        this.sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', '20250227'), (2, 'Sam', '20250228')", new Object[0]);
        this.sql("INSERT INTO `t$branch_stream` VALUES (1, 'John Stream', '20250228'), (3, 'Rick Stream', '20250301')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT pk, name, dt FROM t order by dt")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, Jackson, 20250227]", "+I[2, Sam, 20250228]", "+I[3, Rick Stream, 20250301]"});
        Assertions.assertThat(this.collectResult("SELECT pk, name, dt FROM `t$branch_stream` order by dt")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, John Stream, 20250228]", "+I[3, Rick Stream, 20250301]"});
    }

    @Test
    public void testDifferentRowTypes() throws Exception {
        this.sql("CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'pk')", new Object[0]);
        this.sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' = '2' )", new Object[0]);
        this.sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 10, 'apple')", new Object[0]);
        this.sql("INSERT INTO `t$branch_pk` VALUES (1, 10, 'cat', 100)", new Object[0]);
        this.sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sql("SELECT * FROM t", new Object[0])).hasMessageContaining("Branch main and pk does not have the same row type");
        this.sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT v, k FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[apple, 10]"});
        Assertions.assertThat(this.collectResult("SELECT v, v2, k FROM `t$branch_pk`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[cat, 100, 10]"});
    }

    @Test
    public void testBranchOptionsTable() throws Exception {
        this.sql("CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'test')", new Object[0]);
        this.sql("ALTER TABLE t SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        this.sql("ALTER TABLE t$branch_test SET ('snapshot.time-retained' = '1 h')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t$options")).containsExactlyInAnyOrder((Object[])new String[]{"+I[bucket, 2]", "+I[snapshot.time-retained, 5 h]", "+I[scan.infer-parallelism, false]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM t$branch_test$options")).containsExactlyInAnyOrder((Object[])new String[]{"+I[bucket, 2]", "+I[snapshot.time-retained, 1 h]", "+I[scan.infer-parallelism, false]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM t$options /*+ OPTIONS('branch'='test') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[bucket, 2]", "+I[snapshot.time-retained, 1 h]", "+I[scan.infer-parallelism, false]"});
    }

    @Test
    public void testBranchSchemasTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT schema_id FROM t$branch_b1$schemas order by schema_id")).containsExactlyInAnyOrder((Object[])new String[]{"+I[0]"});
        this.sql("ALTER TABLE t$branch_b1 SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT schema_id FROM t$branch_b1$schemas order by schema_id")).containsExactlyInAnyOrder((Object[])new String[]{"+I[0]", "+I[1]"});
        Assertions.assertThat(this.collectResult("SELECT schema_id FROM t$schemas /*+ OPTIONS('branch'='b1') */ order by schema_id")).containsExactlyInAnyOrder((Object[])new String[]{"+I[0]", "+I[1]"});
    }

    @Test
    public void testBranchAuditLogTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t$audit_log")).containsExactlyInAnyOrder((Object[])new String[]{"+I[+I, 1, 2]"});
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t$branch_b1$audit_log")).containsExactlyInAnyOrder((Object[])new String[]{"+I[+I, 3, 4]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM t$audit_log /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[+I, 3, 4]"});
    }

    @Test
    public void testBranchReadOptimizedTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t$ro")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 2]"});
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t$branch_b1$ro")).containsExactlyInAnyOrder((Object[])new String[]{"+I[3, 4]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM t$ro /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[3, 4]"});
    }

    @Test
    public void testBranchFilesTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (5, 6)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT min_value_stats FROM t$files")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{a=1, b=2}]"});
        Assertions.assertThat(this.collectResult("SELECT min_value_stats FROM t$branch_b1$files")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{a=3, b=4}]", "+I[{a=5, b=6}]"});
        Assertions.assertThat(this.collectResult("SELECT min_value_stats FROM t$files /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{a=3, b=4}]", "+I[{a=5, b=6}]"});
    }

    @Test
    public void testBranchTagsTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        this.paimonTable("t").createTag("tag1", 1L);
        this.sql("CALL sys.create_branch('default.t', 'b1','tag1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        this.paimonTable("t$branch_b1").createTag("tag2", 2L);
        Assertions.assertThat(this.collectResult("SELECT tag_name,snapshot_id,record_count FROM t$tags")).containsExactlyInAnyOrder((Object[])new String[]{"+I[tag1, 1, 1]"});
        Assertions.assertThat(this.collectResult("SELECT tag_name,snapshot_id,record_count FROM t$branch_b1$tags")).containsExactlyInAnyOrder((Object[])new String[]{"+I[tag1, 1, 1]", "+I[tag2, 2, 2]"});
        Assertions.assertThat(this.collectResult("SELECT tag_name,snapshot_id,record_count FROM t$tags /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[tag1, 1, 1]", "+I[tag2, 2, 2]"});
    }

    @Test
    @Timeout(value=60L)
    public void testBranchConsumersTable() throws Exception {
        List<String> branchResult;
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2), (3,4)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM t$branch_b1 /*+ OPTIONS('consumer-id'='id1','consumer.expiration-time'='3h') */", new Object[0]));
        this.sql("INSERT INTO t$branch_b1 VALUES (5, 6), (7, 8)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{5, 6}), Row.of((Object[])new Object[]{7, 8})});
        while ((branchResult = this.collectResult("SELECT * FROM t$branch_b1$consumers")).isEmpty()) {
            Thread.sleep(1000L);
        }
        iterator.close();
        Assertions.assertThat(this.collectResult("SELECT * FROM t$consumers")).isEmpty();
        Assertions.assertThat(branchResult).containsExactlyInAnyOrder((Object[])new String[]{"+I[id1, 2]"});
        Assertions.assertThat(this.collectResult("SELECT * FROM t$consumers /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[id1, 2]"});
    }

    @Test
    public void testBranchManifestsTable() {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (5, 6)", new Object[0]);
        List<Row> res = this.sql("SELECT schema_id, file_name, file_size FROM t$manifests", new Object[0]);
        Assertions.assertThat(res).hasSize(1);
        res = this.sql("SELECT schema_id, file_name, file_size FROM t$branch_b1$manifests", new Object[0]);
        Assertions.assertThat(res).hasSize(2);
        res.forEach(row -> {
            Assertions.assertThat((long)((Long)row.getField(0))).isEqualTo(0L);
            Assertions.assertThat((boolean)StringUtils.startsWith((CharSequence)((String)row.getField(1)), (CharSequence)"manifest")).isTrue();
            Assertions.assertThat((long)((Long)row.getField(2))).isGreaterThan(0L);
        });
        List<Row> dynamicOptionRes = this.sql("SELECT schema_id, file_name, file_size FROM t$manifests /*+ OPTIONS('branch'='b1') */", new Object[0]);
        Assertions.assertThat(dynamicOptionRes).containsExactlyInAnyOrderElementsOf(res);
    }

    @Test
    public void testBranchSnapshotsTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (3, 4)", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (5, 6)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT snapshot_id, schema_id, commit_kind FROM t$snapshots")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 0, APPEND]"});
        Assertions.assertThat(this.collectResult("SELECT snapshot_id, schema_id, commit_kind FROM t$branch_b1$snapshots")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 0, APPEND]", "+I[2, 0, APPEND]"});
        Assertions.assertThat(this.collectResult("SELECT snapshot_id, schema_id, commit_kind FROM t$snapshots /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 0, APPEND]", "+I[2, 0, APPEND]"});
    }

    @Test
    public void testBranchPartitionsTable() throws Exception {
        this.sql("CREATE TABLE t (a INT, b INT,c STRING) PARTITIONED BY (a)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM t$partitions", new Object[0])).isEmpty();
        this.sql("INSERT INTO t VALUES (1, 2, 'x')", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2, 'S1')", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 4, 'S3'), (2, 2, 'S4')", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'b1')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S2'), (2, 2, 'S1'), (2, 2, 'S5')", new Object[0]);
        this.sql("INSERT INTO t$branch_b1 VALUES (1, 4, 'S3'), (2, 2, 'S4')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT `partition`, record_count, file_count FROM t$partitions")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{1}, 3, 3]", "+I[{2}, 3, 2]"});
        Assertions.assertThat(this.collectResult("SELECT `partition`, record_count, file_count FROM t$branch_b1$partitions")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{1}, 2, 2]", "+I[{2}, 3, 2]"});
        Assertions.assertThat(this.collectResult("SELECT `partition`, record_count, file_count FROM t$partitions /*+ OPTIONS('branch'='b1') */")).containsExactlyInAnyOrder((Object[])new String[]{"+I[{1}, 2, 2]", "+I[{2}, 3, 2]"});
    }

    @Test
    public void testCannotSetEmptyFallbackBranch() {
        String errMsg = "Cannot set 'scan.fallback-branch' = 'test' because the branch 'test' isn't existed.";
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE t1 (a INT, b INT) WITH ('scan.fallback-branch' = 'test')", new Object[0])).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)errMsg)});
        Assertions.assertThatThrownBy(() -> {
            this.sql("CREATE TABLE t2 (a INT, b INT)", new Object[0]);
            this.sql("ALTER TABLE t2 SET ('scan.fallback-branch' = 'test')", new Object[0]);
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)errMsg)});
    }

    @Test
    public void testReadBranchTableWithMultiSchemaIds() throws Exception {
        this.sql("CREATE TABLE T ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) PARTITIONED BY (pt) WITH ( 'bucket' = '2' )", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 10, 'apple'), (1, 20, 'banana')", new Object[0]);
        this.sql("ALTER TABLE `T` ADD (v2 INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 10, 'cat', 2), (2, 20, 'dog', 2)", new Object[0]);
        this.sql("ALTER TABLE `T` ADD (v3 INT)", new Object[0]);
        this.sql("CALL sys.create_tag('default.T', 'tag1', 2)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'test', 'tag1')", new Object[0]);
        FileStoreTable table = this.paimonTable("T");
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "test");
        List schemaIds = schemaManager.listAllIds();
        Assertions.assertThat((int)schemaIds.size()).isEqualTo(2);
        Assertions.assertThat(this.collectResult("SELECT * FROM T$branch_test")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10, apple, null]", "+I[1, 20, banana, null]", "+I[2, 10, cat, 2]", "+I[2, 20, dog, 2]"});
    }

    @Test
    public void testMainAndFallbackNoPrimaryKeys() throws Exception {
        this.sql("CREATE TABLE t ( pt INT, v INT ) PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 110)", new Object[0]);
        this.sql("CALL sys.create_branch('default.t', 'test')", new Object[0]);
        this.sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'test' )", new Object[0]);
        this.sql("INSERT INTO `t$branch_test` VALUES (2, 210)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 110]", "+I[2, 210]"});
        this.sql("ALTER TABLE t ADD v2 INT", new Object[0]);
        this.sql("ALTER TABLE `t$branch_test` ADD v2 INT", new Object[0]);
        this.sql("INSERT INTO t VALUES (1, 120, 1200)", new Object[0]);
        this.sql("INSERT INTO `t$branch_test` VALUES (2, 220, 2200)", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT * FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 110, null]", "+I[2, 210, null]", "+I[1, 120, 1200]", "+I[2, 220, 2200]"});
    }

    @Test
    public void testBranchesTableFilter() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 1)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'b1')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'b2')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'b3')", new Object[0]);
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches`")).containsExactlyInAnyOrder((Object[])new String[]{"+I[b1]", "+I[b2]", "+I[b3]"});
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name = 'b2'")).containsExactlyInAnyOrder((Object[])new String[]{"+I[b2]"});
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'b3')")).containsExactlyInAnyOrder((Object[])new String[]{"+I[b1]", "+I[b3]"});
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'non_existent')")).containsExactlyInAnyOrder((Object[])new String[]{"+I[b1]"});
        Assertions.assertThat(this.collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'")).isEmpty();
    }

    private List<String> collectResult(String sql) throws Exception {
        ArrayList<String> result = new ArrayList<String>();
        try (CloseableIterator it = this.tEnv.executeSql(sql).collect();){
            while (it.hasNext()) {
                result.add(((Row)it.next()).toString());
            }
        }
        return result;
    }

    private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
        Assertions.assertThat((long)sm.snapshotCount()).isEqualTo((long)(latest - earliest + 1));
        Assertions.assertThat((Long)sm.earliestSnapshotId()).isEqualTo((long)earliest);
        Assertions.assertThat((Long)sm.latestSnapshotId()).isEqualTo((long)latest);
    }
}

