/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class CatalogTableITCase
extends CatalogITCaseBase {
    @Override
    protected boolean inferScanParallelism() {
        return true;
    }

    @Test
    public void testNotExistMetadataTable() {
        Assertions.assertThatThrownBy(() -> this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots", new Object[0])).hasMessageContaining("Object 'T$snapshots' not found");
    }

    @Test
    public void testSnapshotsTable() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        this.sql("INSERT INTO T VALUES (5, 6)", new Object[0]);
        List<Row> result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"}), Row.of((Object[])new Object[]{3L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE schema_id = 0", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"}), Row.of((Object[])new Object[]{3L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id = 2", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{2L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id > 1", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{2L, 0L, "APPEND"}), Row.of((Object[])new Object[]{3L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id < 2", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id >= 1", new Object[0]);
        Assertions.assertThat(result).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"}), Row.of((Object[])new Object[]{3L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id <= 2", new Object[0]);
        Assertions.assertThat(result).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id in (1, 2)", new Object[0]);
        Assertions.assertThat(result).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"})});
        result = this.sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id in (1, 2) or schema_id=0", new Object[0]);
        Assertions.assertThat(result).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L, "APPEND"}), Row.of((Object[])new Object[]{2L, 0L, "APPEND"}), Row.of((Object[])new Object[]{3L, 0L, "APPEND"})});
    }

    @Test
    public void testSnapshotsTableWithRecordCount() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        List<Row> result = this.sql("SELECT snapshot_id, total_record_count, delta_record_count, changelog_record_count FROM T$snapshots", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 1L, 1L, 0L}), Row.of((Object[])new Object[]{2L, 2L, 1L, 0L})});
    }

    @Test
    public void testOptionsTable() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        List<Row> result = this.sql("SELECT * FROM T$options", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"snapshot.time-retained", "5 h"})});
    }

    @Test
    public void testAllTableOptions() {
        this.sql("CREATE TABLE T (a INT, b INT) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')", new Object[0]);
        this.sql("ALTER TABLE T SET ('c.cc.ccc' = 'val3')", new Object[0]);
        List result = this.sql("SELECT * FROM sys.all_table_options", new Object[0]).stream().filter(row -> !row.getField(2).equals("path")).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"default", "T", "a.aa.aaa", "val1"}), Row.of((Object[])new Object[]{"default", "T", "b.bb.bbb", "val2"}), Row.of((Object[])new Object[]{"default", "T", "c.cc.ccc", "val3"})});
    }

    @Test
    public void testCatalogOptionsTable() {
        List<Row> result = this.sql("SELECT * FROM sys.catalog_options", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"warehouse", this.path})});
    }

    @Test
    public void testDropSystemDatabase() {
        Assertions.assertThatCode(() -> this.sql("DROP DATABASE sys", new Object[0])).hasRootCauseMessage("Can't do operation on system database.");
    }

    @Test
    public void testCreateSystemDatabase() {
        Assertions.assertThatCode(() -> this.sql("CREATE DATABASE sys", new Object[0])).hasRootCauseMessage("Can't do operation on system database.");
    }

    @Test
    public void testChangeTableInSystemDatabase() {
        this.sql("USE sys", new Object[0]);
        Assertions.assertThatCode(() -> this.sql("ALTER TABLE all_table_options SET ('bucket-num' = '5')", new Object[0])).rootCause().hasMessageContaining("Only support alter data table, but is: ");
    }

    @Test
    public void testSystemDatabase() {
        this.sql("USE sys", new Object[0]);
        Assertions.assertThat(this.sql("SHOW TABLES", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"all_table_options"}), Row.of((Object[])new Object[]{"catalog_options"})});
    }

    @Test
    public void testCreateSystemTable() {
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE T$snapshots (a INT, b INT)", new Object[0])).hasRootCauseMessage("Cannot 'createTable' for system table 'Identifier{database='default', object='T$snapshots'}', please use data table.");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE T$aa$bb (a INT, b INT)", new Object[0])).hasRootCauseMessage("System table can only contain one '$' separator, but this is: T$aa$bb");
    }

    @Test
    public void testManifestsTable() {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        List<Row> result = this.sql("SELECT schema_id, file_name, file_size FROM T$manifests", new Object[0]);
        result.forEach(row -> {
            Assertions.assertThat((long)((Long)row.getField(0))).isEqualTo(0L);
            Assertions.assertThat((boolean)StringUtils.startsWith((CharSequence)((String)row.getField(1)), (CharSequence)"manifest")).isTrue();
            Assertions.assertThat((long)((Long)row.getField(2))).isGreaterThan(0L);
        });
    }

    @Test
    public void testManifestsTableWithFileCount() {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        List<Row> result = this.sql("SELECT num_added_files, num_deleted_files FROM T$manifests", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1L, 0L}), Row.of((Object[])new Object[]{1L, 0L})});
    }

    @Test
    public void testSchemasTable() {
        this.sql("CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')", new Object[0]);
        this.sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        this.sql("ALTER TABLE T SET ('snapshot.num-retained.max' = '20')", new Object[0]);
        this.sql("ALTER TABLE T SET ('snapshot.num-retained.min' = '18')", new Object[0]);
        this.sql("ALTER TABLE T SET ('manifest.format' = 'avro')", new Object[0]);
        String actualResult = this.sql("SHOW CREATE TABLE T$schemas", new Object[0]).toString();
        String expectedResult = "[+I[CREATE TABLE `PAIMON`.`default`.`T$schemas` (\n  `schema_id` BIGINT NOT NULL,\n  `fields` VARCHAR(2147483647) NOT NULL,\n  `partition_keys` VARCHAR(2147483647) NOT NULL,\n  `primary_keys` VARCHAR(2147483647) NOT NULL,\n  `options` VARCHAR(2147483647) NOT NULL,\n  `comment` VARCHAR(2147483647),\n  `update_time` TIMESTAMP(3) NOT NULL\n) ]]";
        actualResult = actualResult.replace(" ", "").replace("\n", "");
        expectedResult = expectedResult.replace(" ", "").replace("\n", "");
        Assertions.assertThat((String)actualResult).isEqualTo(expectedResult);
        List<Row> result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas order by schema_id", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], +I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], +I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ], +I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ], +I[4, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"manifest.format\":\"avro\",\"snapshot.num-retained.min\":\"18\"}, ]]");
        result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id = 0", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ]]");
        result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id>0 and schema_id<3 order by schema_id", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], +I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ]]");
        result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id in (1, 3) order by schema_id", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], +I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ]]");
        result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id in (1, 3) or fields='[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}]' order by schema_id", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], +I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], +I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ], +I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ], +I[4, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"manifest.format\":\"avro\",\"snapshot.num-retained.min\":\"18\"}, ]]");
        Assertions.assertThatThrownBy(() -> this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id = 5", new Object[0])).hasCauseInstanceOf(RuntimeException.class).hasRootCauseMessage("schema id: 5 should not greater than max schema id: 4");
        Assertions.assertThatThrownBy(() -> this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T$schemas where schema_id>=6", new Object[0])).hasCauseInstanceOf(RuntimeException.class).hasRootCauseMessage("schema id: 6 should not greater than max schema id: 4");
    }

    @Test
    public void testSnapshotsSchemasTable() {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        this.sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')", new Object[0]);
        this.sql("INSERT INTO T VALUES (5, 6)", new Object[0]);
        this.sql("INSERT INTO T VALUES (7, 8)", new Object[0]);
        List<Row> result = this.sql("SELECT s.snapshot_id, s.schema_id, t.fields FROM T$snapshots s JOIN T$schemas t ON s.schema_id=t.schema_id", new Object[0]);
        Assertions.assertThat(result.stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[2, 0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[3, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]", "+I[4, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]"});
    }

    @Test
    public void testCreateTableLike() {
        this.sql("CREATE TABLE T (a INT)", new Object[0]);
        this.sql("CREATE TABLE T1 LIKE T (EXCLUDING OPTIONS)", new Object[0]);
        List<Row> result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM T1$schemas s", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
    }

    @Test
    public void testCreateTableAs() {
        this.sql("CREATE TABLE t (a INT)", new Object[0]);
        this.sql("INSERT INTO t VALUES(1),(2)", new Object[0]);
        this.sql("CREATE TABLE t1 AS SELECT * FROM t", new Object[0]);
        List<Row> result = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM t1$schemas s", new Object[0]);
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
        List<Row> data = this.sql("SELECT * FROM t1", new Object[0]);
        Assertions.assertThat(data).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        this.sql("CREATE TABLE t_p (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh)", new Object[0]);
        this.sql("INSERT INTO t_p SELECT 1,2,'a','2023-02-19','12'", new Object[0]);
        this.sql("CREATE TABLE t1_p WITH ('partition' = 'dt' ) AS SELECT * FROM t_p", new Object[0]);
        List<Row> resultPartition = this.sql("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment` FROM t1_p$schemas s", new Object[0]);
        Assertions.assertThat((String)resultPartition.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"},{\"id\":3,\"name\":\"dt\",\"type\":\"STRING\"},{\"id\":4,\"name\":\"hh\",\"type\":\"STRING\"}], [\"dt\"], [], {}, ]]");
        List<Row> dataPartition = this.sql("SELECT * FROM t1_p", new Object[0]);
        Assertions.assertThat((String)dataPartition.toString()).isEqualTo("[+I[1, 2, a, 2023-02-19, 12]]");
        this.sql("CREATE TABLE t_option (a INT) WITH ('file.format' = 'orc')", new Object[0]);
        this.sql("INSERT INTO t_option VALUES(1),(2)", new Object[0]);
        this.sql("CREATE TABLE t1_option WITH ('file.format' = 'parquet') AS SELECT * FROM t_option", new Object[0]);
        List<Row> resultOption = this.sql("SELECT * FROM t1_option$options", new Object[0]);
        Assertions.assertThat(resultOption).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"file.format", "parquet"})});
        List<Row> dataOption = this.sql("SELECT * FROM t1_option", new Object[0]);
        Assertions.assertThat(dataOption).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1}), Row.of((Object[])new Object[]{2})});
        this.sql("CREATE TABLE t_pk (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)", new Object[0]);
        this.sql("INSERT INTO t_pk VALUES(1,2,'aaa','2020-01-02','09')", new Object[0]);
        this.sql("CREATE TABLE t_pk_as WITH ('primary-key' = 'dt') AS SELECT * FROM t_pk", new Object[0]);
        List<Row> resultPk = this.sql("SHOW CREATE TABLE t_pk_as", new Object[0]);
        Assertions.assertThat((String)resultPk.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`)"});
        List<Row> dataPk = this.sql("SELECT * FROM t_pk_as", new Object[0]);
        Assertions.assertThat((String)dataPk.toString()).isEqualTo("[+I[1, 2, aaa, 2020-01-02, 09]]");
        this.sql("CREATE TABLE t_all (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)", new Object[0]);
        this.sql("INSERT INTO t_all VALUES(1,2,'login','2020-01-02','09')", new Object[0]);
        this.sql("CREATE TABLE t_all_as WITH ('primary-key' = 'dt,hh' , 'partition' = 'dt' ) AS SELECT * FROM t_all", new Object[0]);
        List<Row> resultAll = this.sql("SHOW CREATE TABLE t_all_as", new Object[0]);
        Assertions.assertThat((String)resultAll.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`, `hh`)"});
        Assertions.assertThat((String)resultAll.toString()).contains(new CharSequence[]{"PARTITIONED BY (`dt`)"});
        List<Row> dataAll = this.sql("SELECT * FROM t_all_as", new Object[0]);
        Assertions.assertThat((String)dataAll.toString()).isEqualTo("[+I[1, 2, login, 2020-01-02, 09]]");
        this.sql("CREATE TABLE t_pk_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist", new Object[0])).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all primary key constraint [aaa]");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE t_pk_ddl_option (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) WITH ('primary-key' = 'dt')", new Object[0])).hasRootCauseMessage("Cannot define primary key on DDL and table options at the same time.");
        this.sql("CREATE TABLE t_partition_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh) ", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist", new Object[0])).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all partition fields [aaa]");
        Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE t_partition_ddl_option (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING) PARTITIONED BY (dt, hh)  WITH ('partition' = 'dt')", new Object[0])).hasRootCauseMessage("Cannot define partition on DDL and table options at the same time.");
    }

    @Test
    public void testConflictOption() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CREATE TABLE T (a INT) WITH ('changelog-producer' = 'input')", new Object[0])).rootCause().isInstanceOf(RuntimeException.class)).hasMessageContaining("Can not set changelog-producer on table without primary keys");
        this.sql("CREATE TABLE T (a INT)", new Object[0]);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("ALTER TABLE T SET ('changelog-producer'='input')", new Object[0])).rootCause().isInstanceOf(RuntimeException.class)).hasMessageContaining("Can not set changelog-producer on table without primary keys");
    }

    @Test
    public void testShowPartitions() {
        this.sql("CREATE TABLE NoPartitionTable (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)", new Object[0]);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("SHOW PARTITIONS NoPartitionTable", new Object[0])).rootCause().isInstanceOf(TableNotPartitionedException.class)).hasMessage("Table default.NoPartitionTable in catalog PAIMON is not partitioned.");
        this.sql("CREATE TABLE PartitionTable (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'", new Object[0]);
        List<Row> result = this.sql("SHOW PARTITIONS PartitionTable", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-01/hh=10"}), Row.of((Object[])new Object[]{"dt=2020-01-02/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-03/hh=11"})});
        result = this.sql("SHOW PARTITIONS PartitionTable partition (hh='11')", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-02/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-03/hh=11"})});
        result = this.sql("SHOW PARTITIONS PartitionTable partition (dt='2020-01-02', hh='11')", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-02/hh=11"})});
    }

    @Test
    void testPKTableGetPartition() throws Exception {
        this.sql("CREATE TABLE IF NOT EXISTS PK_T (id INT PRIMARY KEY NOT ENFORCED, par STRING, word STRING) PARTITIONED BY(par)", new Object[0]);
        FlinkCatalog flinkCatalog = this.flinkCatalog();
        ObjectPath tablePath = new ObjectPath(flinkCatalog.getDefaultDatabase(), "PK_T");
        this.sql("INSERT INTO PK_T VALUES (1, 'p1', 'a'),(2, 'p1', 'b'),(3, 'p2', 'a'),(4, 'p2', 'b')", new Object[0]);
        List partitions = flinkCatalog.listPartitions(tablePath);
        Map<String, Map<String, String>> partitionPropertiesMap1 = CatalogTableITCase.getPartitionProperties(flinkCatalog, tablePath, partitions);
        Assertions.assertThat(partitionPropertiesMap1).allSatisfy((par, properties) -> {
            Assertions.assertThat((String)((String)properties.get("numRows"))).isEqualTo("2");
            Assertions.assertThat((String)((String)properties.get("lastUpdateTime"))).isNotBlank();
            Assertions.assertThat((String)((String)properties.get("numFiles"))).isEqualTo("1");
            Assertions.assertThat((String)((String)properties.get("totalSize"))).isNotBlank();
        });
        this.sql("UPDATE PK_T SET word = 'c' WHERE id = 2", new Object[0]);
        Map<String, Map<String, String>> partitionPropertiesMap2 = CatalogTableITCase.getPartitionProperties(flinkCatalog, tablePath, partitions);
        CatalogTableITCase.assertPartitionNotUpdate("p2", partitionPropertiesMap1, partitionPropertiesMap2);
        CatalogTableITCase.assertPartitionUpdateTo("p1", partitionPropertiesMap1, partitionPropertiesMap2, 3L, 2L);
        this.sql("DELETE FROM PK_T WHERE id = 3", new Object[0]);
        Map<String, Map<String, String>> partitionPropertiesMap3 = CatalogTableITCase.getPartitionProperties(flinkCatalog, tablePath, partitions);
        CatalogTableITCase.assertPartitionNotUpdate("p1", partitionPropertiesMap2, partitionPropertiesMap3);
        CatalogTableITCase.assertPartitionUpdateTo("p2", partitionPropertiesMap2, partitionPropertiesMap3, 3L, 2L);
    }

    @Test
    void testNonPKTableGetPartition() throws Exception {
        FlinkCatalog flinkCatalog = this.flinkCatalog();
        ObjectPath tablePath = new ObjectPath(flinkCatalog.getDefaultDatabase(), "NON_PK_T");
        this.sql("CREATE TABLE IF NOT EXISTS NON_PK_T (par STRING, a INT, b INT) PARTITIONED BY(par)", new Object[0]);
        this.sql("INSERT INTO NON_PK_T VALUES ('p1', 2, 3),('p2', 4, 5)", new Object[0]);
        List partitions = flinkCatalog.listPartitions(tablePath);
        Map<String, Map<String, String>> partitionPropertiesMap1 = CatalogTableITCase.getPartitionProperties(flinkCatalog, tablePath, partitions);
        Assertions.assertThat(partitionPropertiesMap1).allSatisfy((par, properties) -> {
            Assertions.assertThat((String)((String)properties.get("numRows"))).isEqualTo("1");
            Assertions.assertThat((String)((String)properties.get("lastUpdateTime"))).isNotBlank();
        });
        this.sql("INSERT INTO NON_PK_T VALUES ('p1', 6, 7), ('p1', 8, 9)", new Object[0]);
        Map<String, Map<String, String>> partitionPropertiesMap2 = CatalogTableITCase.getPartitionProperties(flinkCatalog, tablePath, partitions);
        CatalogTableITCase.assertPartitionNotUpdate("p2", partitionPropertiesMap1, partitionPropertiesMap2);
        CatalogTableITCase.assertPartitionUpdateTo("p1", partitionPropertiesMap1, partitionPropertiesMap2, 3L, 2L);
    }

    @Test
    public void testDropPartition() {
        this.sql("CREATE TABLE PartitionTable (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 1,1,'a','2020-01-01','10'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 2,2,'b','2020-01-02','11'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 3,3,'c','2020-01-03','11'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 4,4,'d','2020-01-04','14'", new Object[0]);
        this.sql("INSERT INTO PartitionTable select 5,5,'e','2020-01-05','15'", new Object[0]);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-10-10')", new Object[0])).rootCause().isInstanceOf(PartitionNotExistException.class)).hasMessage("Partition CatalogPartitionSpec{{dt=2020-10-10}} of table default.PartitionTable in catalog PAIMON does not exist.");
        Assertions.assertThat((String)this.sql("ALTER TABLE PartitionTable DROP IF EXISTS PARTITION (`dt` = '2020-10-10')", new Object[0]).toString()).isEqualTo("[+I[OK]]");
        List<Row> result = this.sql("SHOW PARTITIONS PartitionTable", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-01/hh=10"}), Row.of((Object[])new Object[]{"dt=2020-01-02/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-03/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-04/hh=14"}), Row.of((Object[])new Object[]{"dt=2020-01-05/hh=15"})});
        this.sql("ALTER TABLE PartitionTable DROP PARTITION (`dt` = '2020-01-01', `hh` = '10')", new Object[0]);
        result = this.sql("SHOW PARTITIONS PartitionTable", new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-02/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-03/hh=11"}), Row.of((Object[])new Object[]{"dt=2020-01-04/hh=14"}), Row.of((Object[])new Object[]{"dt=2020-01-05/hh=15"})});
        this.sql("ALTER TABLE PartitionTable DROP PARTITION (dt ='2020-01-04'), PARTITION (hh='11')", new Object[0]);
        result = this.sql("SHOW PARTITIONS PartitionTable", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"dt=2020-01-05/hh=15"})});
    }

    @Test
    public void testFileFormatPerLevel() {
        this.sql("CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('num-sorted-run.compaction-trigger'='2','file.format.per.level' = '0:avro,3:parquet', 'num-levels' = '4')", new Object[0]);
        this.sql("INSERT INTO T1 SELECT 1,'AAA'", new Object[0]);
        this.sql("INSERT INTO T1 SELECT 2,'BBB'", new Object[0]);
        this.sql("INSERT INTO T1 SELECT 3,'CCC'", new Object[0]);
        List<Row> rows = this.sql("SELECT * FROM T1", new Object[0]);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "AAA"}), Row.of((Object[])new Object[]{2, "BBB"}), Row.of((Object[])new Object[]{3, "CCC"})});
        rows = this.sql("SELECT level,file_format FROM T1$files", new Object[0]);
        Assertions.assertThat(rows).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, "parquet"}), Row.of((Object[])new Object[]{0, "avro"})});
    }

    @Test
    public void testFilesTable() {
        this.sql("CREATE TABLE T_WITH_KEY (a INT, p INT, b BIGINT, c STRING, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p) ", new Object[0]);
        this.assertFilesTable("T_WITH_KEY");
        this.sql("CREATE TABLE T_APPEND_ONLY (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p)", new Object[0]);
        this.assertFilesTable("T_APPEND_ONLY");
    }

    private void assertFilesTable(String tableName) {
        Assertions.assertThat(this.sql(String.format("SELECT * FROM %s$files", tableName), new Object[0])).isEmpty();
        this.sql(String.format("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 1, 2, 'S1')", tableName), new Object[0]);
        this.sql(String.format("ALTER TABLE %s ADD (d INT, e INT, f INT)", tableName), new Object[0]);
        this.sql(String.format("INSERT INTO %s VALUES (5, 1, 6, 'S3', 7, 8, 9), (10, 1, 11, 'S4', 12, 13, 14)", tableName), new Object[0]);
        this.sql(String.format("ALTER TABLE %s DROP (c, e)", tableName), new Object[0]);
        this.sql(String.format("ALTER TABLE %s RENAME d TO dd", tableName), new Object[0]);
        this.sql(String.format("ALTER TABLE %s RENAME b TO bb", tableName), new Object[0]);
        this.sql(String.format("INSERT INTO %s VALUES (19, 1, 20, 21, 22), (15, 1, 16, 17, 18), (23, 2, 24, 25, 26), (27, 2, 28, 29, 30)", tableName), new Object[0]);
        List<Row> rows1 = this.sql(String.format("SELECT * FROM %s$files", tableName), new Object[0]);
        for (Row row : rows1) {
            Assertions.assertThat((boolean)StringUtils.endsWith((CharSequence)((String)row.getField(2)), (CharSequence)".parquet")).isTrue();
            Assertions.assertThat((long)((Long)row.getField(7))).isGreaterThan(0L);
        }
        Assertions.assertThat(this.getRowStringList(rows1)).containsExactlyInAnyOrder((Object[])new String[]{String.format("{2},0,parquet,4,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=23, bb=24, dd=25, f=26, p=2},{a=27, bb=28, dd=29, f=30, p=2}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[23, 2, 24, 25, 26],[27, 2, 28, 29, 30]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[23],[27]")), String.format("{1},0,parquet,0,0,2,%s,{a=0, bb=0, dd=2, f=2, p=0},{a=1, bb=2, dd=null, f=null, p=1},{a=3, bb=4, dd=null, f=null, p=1}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[1, 1, 2, S1],[3, 1, 4, S2]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[1],[3]")), String.format("{1},0,parquet,1,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=5, bb=6, dd=7, f=9, p=1},{a=10, bb=11, dd=12, f=14, p=1}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[5, 1, 6, S3, 7, 8, 9],[10, 1, 11, S4, 12, 13, 14]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[5],[10]")), String.format("{1},0,parquet,4,0,2,%s,{a=0, bb=0, dd=0, f=0, p=0},{a=15, bb=16, dd=17, f=18, p=1},{a=19, bb=20, dd=21, f=22, p=1}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[15, 1, 16, 17, 18],[19, 1, 20, 21, 22]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[15],[19]"))});
        List<Row> rows2 = this.sql(String.format("SELECT * FROM %s$files /*+ OPTIONS('scan.snapshot-id'='2') */", tableName), new Object[0]);
        for (Row row : rows2) {
            Assertions.assertThat((boolean)StringUtils.endsWith((CharSequence)((String)row.getField(2)), (CharSequence)".parquet")).isTrue();
            Assertions.assertThat((long)((Long)row.getField(7))).isGreaterThan(0L);
        }
        Assertions.assertThat(this.getRowStringList(rows2)).containsExactlyInAnyOrder((Object[])new String[]{String.format("{1},0,parquet,0,0,2,%s,{a=0, b=0, c=0, d=2, e=2, f=2, p=0},{a=1, b=2, c=S1, d=null, e=null, f=null, p=1},{a=3, b=4, c=S2, d=null, e=null, f=null, p=1}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[1, 1, 2, S1],[3, 1, 4, S2]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[1],[3]")), String.format("{1},0,parquet,1,0,2,%s,{a=0, b=0, c=0, d=0, e=0, f=0, p=0},{a=5, b=6, c=S3, d=7, e=8, f=9, p=1},{a=10, b=11, c=S4, d=12, e=13, f=14, p=1}", StringUtils.endsWith((CharSequence)tableName, (CharSequence)"VALUE_COUNT") ? "[5, 1, 6, S3, 7, 8, 9],[10, 1, 11, S4, 12, 13, 14]" : (StringUtils.endsWith((CharSequence)tableName, (CharSequence)"APPEND_ONLY") ? "," : "[5],[10]"))});
    }

    @Test
    public void testFilesTableWithFilter() {
        this.tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, (Object)true);
        this.sql("CREATE TABLE T_WITH_FILTER (k INT, p INT, v INT, PRIMARY KEY (k, p) NOT ENFORCED) PARTITIONED BY (p) WITH ('bucket'='2')", new Object[0]);
        this.sql("INSERT INTO T_WITH_FILTER VALUES (1, 2, 3), (4, 5, 6)", new Object[0]);
        this.sql("CALL sys.compact('default.T_WITH_FILTER')", new Object[0]);
        this.sql("INSERT INTO T_WITH_FILTER VALUES (7, 8, 9)", new Object[0]);
        Assertions.assertThat(this.sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{2}", 0, 5}), Row.of((Object[])new Object[]{"{5}", 0, 5}), Row.of((Object[])new Object[]{"{8}", 1, 0})});
        Assertions.assertThat(this.sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE `partition`='{2}'", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{2}", 0, 5})});
        Assertions.assertThat(this.sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE bucket=0", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{2}", 0, 5}), Row.of((Object[])new Object[]{"{5}", 0, 5})});
        Assertions.assertThat(this.sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE level=0", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{8}", 1, 0})});
    }

    @Nonnull
    private List<String> getRowStringList(List<Row> rows) {
        return rows.stream().map(v -> StringUtils.join((Object[])new Object[]{v.getField(0), v.getField(1), v.getField(3), v.getField(4), v.getField(5), v.getField(6), v.getField(8), v.getField(9), v.getField(10), v.getField(11), v.getField(12)}, (String)",")).collect(Collectors.toList());
    }

    @Test
    public void testTagsTable() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        this.sql("INSERT INTO T VALUES (5, 6)", new Object[0]);
        this.paimonTable("T").createTag("tag1", 1L);
        this.paimonTable("T").createTag("tag2", 2L);
        this.paimonTable("T").createTag("tag3", 3L);
        List<Row> result = this.sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags ORDER BY tag_name", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", 1L, 0L, 1L}), Row.of((Object[])new Object[]{"tag2", 2L, 0L, 2L}), Row.of((Object[])new Object[]{"tag3", 3L, 0L, 3L})});
        result = this.sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name = 'tag1' ", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", 1L, 0L, 1L})});
        result = this.sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name in ('tag1', 'tag3')", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", 1L, 0L, 1L}), Row.of((Object[])new Object[]{"tag3", 3L, 0L, 3L})});
        result = this.sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name in ('tag1') or snapshot_id=2", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", 1L, 0L, 1L}), Row.of((Object[])new Object[]{"tag2", 2L, 0L, 2L})});
    }

    @Test
    @Timeout(value=60L)
    public void testConsumersTable() throws Exception {
        List<Row> result;
        this.batchSql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T /*+ OPTIONS('consumer-id'='my1','consumer.expiration-time'='3h') */", new Object[0]));
        this.batchSql("INSERT INTO T VALUES (5, 6), (7, 8)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2}), Row.of((Object[])new Object[]{3, 4})});
        while ((result = this.sql("SELECT * FROM T$consumers", new Object[0])).isEmpty()) {
            Thread.sleep(1000L);
        }
        iterator.close();
        Assertions.assertThat(result).hasSize(1);
        Assertions.assertThat((Object)result.get(0).getField(0)).isEqualTo((Object)"my1");
        Assertions.assertThat((Long)((Long)result.get(0).getField(1))).isGreaterThanOrEqualTo(3L);
    }

    @Test
    public void testConsumerIdExpInBatchMode() {
        this.batchSql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (5, 6), (7, 8)", new Object[0]);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1", new Object[0])).rootCause().isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it");
    }

    @Test
    public void testConsumerIdExpInStreamingMode() {
        this.batchSql("CREATE TABLE T (a INT, b INT)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (3, 4)", new Object[0]);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T /*+ OPTIONS('consumer-id'='test-id') */", new Object[0])).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it");
    }

    @Test
    public void testPartitionsTable() {
        String table = "PARTITIONS_TABLE";
        this.sql("CREATE TABLE %s (a INT, p INT, b BIGINT, c STRING) PARTITIONED BY (p)", table);
        Assertions.assertThat(this.sql("SELECT * FROM %s$partitions", table)).isEmpty();
        this.sql("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 'S1'), (1, 2, 2, 'S1')", table);
        this.sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
        List<Row> result = this.sql("SELECT `partition`, record_count, file_count FROM %s$partitions", table);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{1}", 2L, 2L}), Row.of((Object[])new Object[]{"{2}", 3L, 2L})});
        this.sql("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", table);
        this.sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
        result = this.sql(String.format("SELECT `partition`, record_count, file_count FROM %s$partitions", table), new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{1}", 3L, 3L}), Row.of((Object[])new Object[]{"{2}", 4L, 3L}), Row.of((Object[])new Object[]{"{3}", 1L, 1L}), Row.of((Object[])new Object[]{"{4}", 1L, 1L})});
        this.sql("ALTER TABLE %s DROP PARTITION (p = 2)", table);
        result = this.sql(String.format("SELECT `partition`, record_count, file_count FROM %s$partitions", table), new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{1}", 3L, 3L}), Row.of((Object[])new Object[]{"{3}", 1L, 1L}), Row.of((Object[])new Object[]{"{4}", 1L, 1L})});
        this.sql("INSERT INTO %s VALUES (1, 2, 2, 'S1')", table);
        result = this.sql(String.format("SELECT `partition`, record_count, file_count FROM %s$partitions", table), new Object[0]);
        Assertions.assertThat(result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"{1}", 3L, 3L}), Row.of((Object[])new Object[]{"{2}", 1L, 1L}), Row.of((Object[])new Object[]{"{3}", 1L, 1L}), Row.of((Object[])new Object[]{"{4}", 1L, 1L})});
    }

    @Test
    public void testInvalidStreamingReadOverwrite() {
        String ddl = "CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b STRING)WITH ('changelog-producer' = '%s', 'streaming-read-overwrite' = 'true')";
        Assertions.assertThatThrownBy(() -> this.sql(ddl, "full-compaction")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot set streaming-read-overwrite to true when changelog producer is full-compaction or lookup because it will read duplicated changes.")});
        Assertions.assertThatThrownBy(() -> this.sql(ddl, "lookup")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot set streaming-read-overwrite to true when changelog producer is full-compaction or lookup because it will read duplicated changes.")});
    }

    @Test
    public void testShowTableMetadataComment() {
        this.sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b INT)", new Object[0]);
        List<Row> result = this.sql("SHOW CREATE TABLE T", new Object[0]);
        ((AbstractStringAssert)Assertions.assertThat((String)result.get(0).toString()).contains(new CharSequence[]{"CREATE TABLE `PAIMON`.`default`.`T` (\n  `a` INT,\n  `name` VARCHAR(2147483647) METADATA COMMENT 'header1',\n  `b` INT\n)"})).doesNotContain(new CharSequence[]{"schema"});
    }

    @Test
    public void testReadOptimizedTable() {
        this.sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '1')", new Object[0]);
        this.innerTestReadOptimizedTableAndCheckData("T");
        this.sql("DROP TABLE T", new Object[0]);
        this.sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '-1')", new Object[0]);
        this.innerTestReadOptimizedTableAndCheckData("T");
    }

    @Test
    public void testReadOptimizedTableFallBack() {
        this.sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '1')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'stream')", new Object[0]);
        this.sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')", new Object[0]);
        this.innerTestReadOptimizedTableAndCheckData("T$branch_stream");
        this.sql("DROP TABLE T", new Object[0]);
        this.sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('bucket' = '-1')", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'stream')", new Object[0]);
        this.sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')", new Object[0]);
        this.innerTestReadOptimizedTableAndCheckData("T$branch_stream");
        this.sql("DROP TABLE T", new Object[0]);
        this.sql("CREATE TABLE T (k INT, v INT, n INT) PARTITIONED BY (k)", new Object[0]);
        this.sql("CALL sys.create_branch('default.T', 'stream')", new Object[0]);
        this.sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')", new Object[0]);
        this.sql("ALTER TABLE T$branch_stream SET ('primary-key' = 'k,v', 'bucket' = '2','changelog-producer' = 'lookup')", new Object[0]);
        this.sql("show create table T$branch_stream", new Object[0]);
        this.sql("INSERT INTO T$branch_stream /*+ OPTIONS('full-compaction.delta-commits' = '1') */ VALUES (1, 10, 10), (2, 20, 20)", new Object[0]);
        List<Row> result = this.sql("SELECT k, v, n FROM T$ro ORDER BY k", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10, 10}), Row.of((Object[])new Object[]{2, 20, 20})});
        this.sql("INSERT INTO T$branch_stream /*+ OPTIONS('write-only' = 'true') */VALUES (1, 10, 11), (3, 30, 30)", new Object[0]);
        result = this.sql("SELECT k, v, n FROM T$ro ORDER BY k, v", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10, 10}), Row.of((Object[])new Object[]{2, 20, 20})});
        this.sql("INSERT INTO T$branch_stream /*+ OPTIONS('full-compaction.delta-commits' = '1') */ VALUES (1, 10, 12), (2, 20, 21), (3, 30, 31)", new Object[0]);
        result = this.sql("SELECT k, v, n FROM T$ro ORDER BY k, v", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10, 12}), Row.of((Object[])new Object[]{2, 20, 21}), Row.of((Object[])new Object[]{3, 30, 31})});
        this.sql("INSERT INTO T VALUES (1, 10, 101), (2222, 202, 202)", new Object[0]);
        result = this.sql("SELECT k, v, n FROM T$ro ORDER BY k", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10, 101}), Row.of((Object[])new Object[]{2, 20, 21}), Row.of((Object[])new Object[]{3, 30, 31}), Row.of((Object[])new Object[]{2222, 202, 202})});
    }

    @Test
    public void testBinlogTableStreamRead() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', 'bucket' = '2')", new Object[0]);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 2)", new Object[0]);
        List rows = iterator.collect(3);
        Assertions.assertThat((List)rows).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"+I", new Integer[]{1}, new Integer[]{2}}), Row.of((Object[])new Object[]{"+U", new Integer[]{1, 1}, new Integer[]{2, 3}}), Row.of((Object[])new Object[]{"+I", new Integer[]{2}, new Integer[]{2}})});
        iterator.close();
    }

    @Test
    public void testBinlogTableBatchRead() throws Exception {
        this.sql("CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) with ('changelog-producer' = 'lookup', 'bucket' = '2')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1, 3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2, 2)", new Object[0]);
        List<Row> rows = this.sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 'latest') */", new Object[0]);
        Assertions.assertThat(rows).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"+I", new Integer[]{1}, new Integer[]{3}}), Row.of((Object[])new Object[]{"+I", new Integer[]{2}, new Integer[]{2}})});
    }

    @Test
    public void testIndexesTable() {
        this.sql(String.format("CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, a) NOT ENFORCED) PARTITIONED BY (pt) with ('deletion-vectors.enabled'='true', 'deletion-vectors.bitmap64' = '%s')", ThreadLocalRandom.current().nextBoolean()), new Object[0]);
        this.sql("INSERT INTO T VALUES ('2024-10-01', 1, 'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')", new Object[0]);
        this.sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 3, 'c_new1')", new Object[0]);
        List<Row> rows = this.sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'HASH'", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(1);
        Row row = rows.get(0);
        Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)"{2024-10-01}");
        Assertions.assertThat((Object)row.getField(1)).isEqualTo((Object)0);
        Assertions.assertThat((Object)row.getField(2)).isEqualTo((Object)"HASH");
        Assertions.assertThat((boolean)row.getField(3).toString().startsWith("index-")).isTrue();
        Assertions.assertThat((Object)row.getField(4)).isEqualTo((Object)12L);
        Assertions.assertThat((Object)row.getField(5)).isEqualTo((Object)3L);
        Assertions.assertThat((Object)row.getField(6)).isNull();
        rows = this.sql("SELECT * FROM `T$table_indexes` WHERE index_type = 'DELETION_VECTORS'", new Object[0]);
        Assertions.assertThat((int)rows.size()).isEqualTo(1);
        row = rows.get(0);
        Assertions.assertThat((Object)row.getField(0)).isEqualTo((Object)"{2024-10-01}");
        Assertions.assertThat((Object)row.getField(1)).isEqualTo((Object)0);
        Assertions.assertThat((Object)row.getField(2)).isEqualTo((Object)"DELETION_VECTORS");
        Assertions.assertThat((boolean)row.getField(3).toString().startsWith("index-")).isTrue();
        Assertions.assertThat((Object)row.getField(4)).isIn(new Object[]{33L, 45L});
        Assertions.assertThat((Object)row.getField(5)).isEqualTo((Object)1L);
        Assertions.assertThat((Object)row.getField(6)).isNotNull();
    }

    private void innerTestReadOptimizedTableAndCheckData(String insertTableName) {
        this.sql(String.format("INSERT INTO %s /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (1, 10), (2, 20)", insertTableName), new Object[0]);
        List<Row> result = this.sql("SELECT k, v FROM T$ro ORDER BY k", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10}), Row.of((Object[])new Object[]{2, 20})});
        this.sql(String.format("INSERT INTO %s VALUES (1, 11), (3, 30)", insertTableName), new Object[0]);
        result = this.sql("SELECT k, v FROM T$ro ORDER BY k", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 10}), Row.of((Object[])new Object[]{2, 20})});
        this.sql(String.format("INSERT INTO %s /*+ OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (2, 21), (3, 31)", insertTableName), new Object[0]);
        result = this.sql("SELECT k, v FROM T$ro ORDER BY k", new Object[0]);
        Assertions.assertThat(result).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11}), Row.of((Object[])new Object[]{2, 21}), Row.of((Object[])new Object[]{3, 31})});
    }

    private static Map<String, Map<String, String>> getPartitionProperties(FlinkCatalog flinkCatalog, ObjectPath tablePath, List<CatalogPartitionSpec> partitions) {
        HashMap<String, Map<String, String>> partitionPropertiesMap = new HashMap<String, Map<String, String>>();
        partitions.forEach(p -> {
            try {
                String partition = (String)p.getPartitionSpec().get("par");
                CatalogPartition catalogPartition = flinkCatalog.getPartition(tablePath, p);
                Map properties = catalogPartition.getProperties();
                partitionPropertiesMap.put(partition, properties);
            }
            catch (PartitionNotExistException e) {
                throw new RuntimeException(e);
            }
        });
        return partitionPropertiesMap;
    }

    private static void assertPartitionNotUpdate(String partition, Map<String, Map<String, String>> oldProperties, Map<String, Map<String, String>> newProperties) {
        Assertions.assertThat(oldProperties.get(partition)).isEqualTo(newProperties.get(partition));
    }

    private static void assertPartitionUpdateTo(String partition, Map<String, Map<String, String>> oldProperties, Map<String, Map<String, String>> newProperties, Long expectedNumRows, Long expectedNumFiles) {
        Map<String, String> newPartitionProperties = newProperties.get(partition);
        Map<String, String> oldPartitionProperties = oldProperties.get(partition);
        Assertions.assertThat((String)newPartitionProperties.get("numRows")).isEqualTo(String.valueOf(expectedNumRows));
        Assertions.assertThat((Long)Long.valueOf(newPartitionProperties.get("lastUpdateTime"))).isGreaterThan((Comparable)Long.valueOf(oldPartitionProperties.get("lastUpdateTime")));
        Assertions.assertThat((String)newPartitionProperties.get("numFiles")).isEqualTo(String.valueOf(expectedNumFiles));
        Assertions.assertThat((Long)Long.valueOf(newPartitionProperties.get("totalSize"))).isGreaterThan((Comparable)Long.valueOf(oldPartitionProperties.get("totalSize")));
    }
}

