/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.hive;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.DataCatalogTable;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.PaimonStorageHandler;
import org.apache.paimon.hive.annotation.Minio;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.s3.MinioTestContainer;
import org.apache.paimon.table.Table;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.model.Statement;

@RunWith(value=PaimonEmbeddedHiveRunner.class)
public abstract class HiveCatalogITCaseBase {
    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    protected String path;
    protected TableEnvironment tEnv;
    @HiveSQL(files={})
    protected static HiveShell hiveShell;
    @Minio
    private static MinioTestContainer minioTestContainer;
    @Rule
    public TestRule environmentRule = (base, description) -> new Statement(){

        public void evaluate() throws Throwable {
            try {
                HiveCatalogITCaseBase.this.before(description.getAnnotation(LocationInProperties.class) != null);
                base.evaluate();
            }
            finally {
                HiveCatalogITCaseBase.this.after();
            }
        }
    };

    private void before(boolean locationInProperties) throws Exception {
        HashMap<String, String> catalogProperties = new HashMap<String, String>();
        catalogProperties.put("type", "paimon");
        catalogProperties.put("metastore", "hive");
        catalogProperties.put("uri", "");
        catalogProperties.put("lock.enabled", "true");
        catalogProperties.put("location-in-properties", String.valueOf(locationInProperties));
        if (locationInProperties) {
            this.path = minioTestContainer.getS3UriForDefaultBucket() + "/" + UUID.randomUUID();
            catalogProperties.putAll(minioTestContainer.getS3ConfigOptions());
        } else {
            this.path = this.folder.newFolder().toURI().toString();
        }
        catalogProperties.put("warehouse", this.path);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        this.tEnv = TableEnvironmentImpl.create((EnvironmentSettings)settings);
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE CATALOG my_hive WITH (", catalogProperties.entrySet().stream().map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())).collect(Collectors.joining(",\n")), ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive").await();
        this.tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
        this.tEnv.executeSql("CREATE DATABASE test_db").await();
        this.tEnv.executeSql("USE test_db").await();
        hiveShell.execute("USE test_db");
        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')");
    }

    private void after() {
        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
        hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
    }

    @Test
    public void testDatabaseOperations() throws Exception {
        this.tEnv.executeSql("CREATE DATABASE test_db2").await();
        Assertions.assertThat(this.collect("SHOW DATABASES")).isEqualTo(Arrays.asList(Row.of((Object[])new Object[]{"default"}), Row.of((Object[])new Object[]{"test_db"}), Row.of((Object[])new Object[]{"test_db2"})));
        this.tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db2").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE DATABASE test_db2").await()).hasRootCauseInstanceOf(DatabaseAlreadyExistException.class).hasRootCauseMessage("Database test_db2 already exists in Catalog my_hive.");
        this.tEnv.executeSql("DROP DATABASE test_db2").await();
        Assertions.assertThat(this.collect("SHOW DATABASES")).isEqualTo(Arrays.asList(Row.of((Object[])new Object[]{"default"}), Row.of((Object[])new Object[]{"test_db"})));
        this.tEnv.executeSql("DROP DATABASE IF EXISTS test_db2").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("DROP DATABASE test_db2").await()).hasRootCauseInstanceOf(DatabaseNotExistException.class).hasRootCauseMessage("Database test_db2 does not exist in Catalog my_hive.");
        this.tEnv.executeSql("CREATE DATABASE test_db2").await();
        this.tEnv.executeSql("USE test_db2").await();
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await();
        org.apache.flink.core.fs.Path tablePath = new org.apache.flink.core.fs.Path(this.path, "test_db2.db/t");
        Assertions.assertThat((boolean)tablePath.getFileSystem().exists(tablePath)).isTrue();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("DROP DATABASE test_db2").await()).hasRootCauseInstanceOf(DatabaseNotEmptyException.class).hasRootCauseMessage("Database test_db2 in catalog my_hive is not empty.");
        this.tEnv.executeSql("DROP DATABASE test_db2 CASCADE").await();
        Assertions.assertThat(this.collect("SHOW DATABASES")).isEqualTo(Arrays.asList(Row.of((Object[])new Object[]{"default"}), Row.of((Object[])new Object[]{"test_db"})));
        Assertions.assertThat((boolean)tablePath.getFileSystem().exists(tablePath)).isFalse();
    }

    @Test
    public void testTableOperations() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        Assertions.assertThat(this.collect("SHOW TABLES")).isEqualTo(Arrays.asList(Row.of((Object[])new Object[]{"s"}), Row.of((Object[])new Object[]{"t"})));
        this.tEnv.executeSql("CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await()).hasRootCauseInstanceOf(TableAlreadyExistException.class).hasRootCauseMessage("Table (or view) test_db.s already exists in Catalog my_hive.");
        this.tEnv.executeSql("INSERT INTO s VALUES (1, 'Hi'), (2, 'Hello')").await();
        org.apache.flink.core.fs.Path tablePath = new org.apache.flink.core.fs.Path(this.path, "test_db.db/s");
        Assertions.assertThat((boolean)tablePath.getFileSystem().exists(tablePath)).isTrue();
        this.tEnv.executeSql("DROP TABLE s").await();
        Assertions.assertThat(this.collect("SHOW TABLES")).isEqualTo(Collections.singletonList(Row.of((Object[])new Object[]{"t"})));
        Assertions.assertThat((boolean)tablePath.getFileSystem().exists(tablePath)).isFalse();
        this.tEnv.executeSql("DROP TABLE IF EXISTS s").await();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("DROP TABLE s").await()).isInstanceOf(ValidationException.class)).hasMessage("Table with identifier 'my_hive.test_db.s' does not exist.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("DROP TABLE hive_table").await()).isInstanceOf(ValidationException.class)).hasMessage("Table with identifier 'my_hive.test_db.hive_table' does not exist.");
        this.tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = '16MB' )").await();
        List<Row> actual = this.collect("SHOW CREATE TABLE t");
        Assertions.assertThat((int)actual.size()).isEqualTo(1);
        Assertions.assertThat((boolean)actual.get(0).getField(0).toString().contains("'manifest.target-file-size' = '16MB'")).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE s SET ( 'manifest.target-file-size' = '16MB' )").await()).isInstanceOf(RuntimeException.class)).hasMessage("Table `my_hive`.`test_db`.`s` doesn't exist or is a temporary table.");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )").await()).isInstanceOf(RuntimeException.class)).hasMessage("Table `my_hive`.`test_db`.`hive_table` doesn't exist or is a temporary table.");
    }

    @Test
    public void testCreateExternalTable() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE CATALOG my_hive_external WITH (", "  'type' = 'paimon',", "  'metastore' = 'hive',", "  'uri' = '',", "  'warehouse' = '" + this.path + "',", "  'lock.enabled' = 'true',", "  'table.type' = 'EXTERNAL'", ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive_external").await();
        this.tEnv.executeSql("USE test_db").await();
        this.tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await();
        Assertions.assertThat((boolean)hiveShell.executeQuery("DESC FORMATTED t").contains("Table Type:         \tEXTERNAL_TABLE      \tNULL")).isTrue();
        this.tEnv.executeSql("DROP TABLE t").await();
        org.apache.flink.core.fs.Path tablePath = new org.apache.flink.core.fs.Path(this.path, "test_db.db/t");
        Assertions.assertThat((boolean)tablePath.getFileSystem().exists(tablePath)).isFalse();
    }

    @Test
    public void testFlinkWriteAndHiveRead() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t ( f0 BOOLEAN, f1 TINYINT, f2 SMALLINT, f3 INT, f4 BIGINT, f5 FLOAT, f6 DOUBLE, f7 DECIMAL(10,2), f8 CHAR(3), f9 VARCHAR(10), f10 STRING, f11 BINARY, f12 VARBINARY, f13 DATE, f14 TIMESTAMP(6), f15 ARRAY<STRING>, f16 Map<STRING, STRING>, f17 ROW<f0 STRING, f1 INT>) WITH ( 'file.format' = 'avro' )").await();
        this.tEnv.executeSql("INSERT INTO t VALUES (true, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), 1, 1234567890123456789, 1.23, 3.14159, CAST('1234.56' AS DECIMAL(10, 2)), 'ABC', 'v1', 'Hello, World!', X'010203', X'010203', DATE '2023-01-01', TIMESTAMP '2023-01-01 12:00:00.123', ARRAY['value1', 'value2', 'value3'], MAP['key1', 'value1', 'key2', 'value2'], ROW('v1', 1)), (false, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), 2, 234567890123456789, 2.34, 2.111111, CAST('2345.67' AS DECIMAL(10, 2)), 'DEF', 'v2', 'Apache Paimon', X'040506',X'040506', DATE '2023-02-01', TIMESTAMP '2023-02-01 12:00:00.456', ARRAY['value4', 'value5', 'value6'], MAP['key1', 'value11', 'key2', 'value22'], ROW('v2', 2))").await();
        Assertions.assertThat((List)hiveShell.executeQuery("SELECT f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, hex(f11), hex(f12), f13, f14, f15, f15[0] as f15a, f16['key1'] as f16a, f16['key2'] as f16b, f17, f17.f0, f17.f1 FROM t ORDER BY f3")).isEqualTo(Arrays.asList("true\t1\t1\t1\t1234567890123456789\t1.23\t3.14159\t1234.56\tABC\tv1\tHello, World!\t01\t010203\t2023-01-01\t2023-01-01 12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t1", "false\t2\t2\t2\t234567890123456789\t2.34\t2.111111\t2345.67\tDEF\tv2\tApache Paimon\t04\t040506\t2023-02-01\t2023-02-01 12:00:00.456\t[\"value4\",\"value5\",\"value6\"]\tvalue4\tvalue11\tvalue22\t{\"f0\":\"v2\",\"f1\":2}\tv2\t2"));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')").await()).isInstanceOf(TableException.class)).hasMessage("Cannot find table '`my_hive`.`test_db`.`hive_table`' in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
    }

    @Test
    public void testFlinkWriteAndHiveReadToCompare() throws Exception {
        this.tEnv.executeSql("create table students\n(id decimal(20,0)\n,upload_insert TIMESTAMP\n,dt string\n,PRIMARY KEY(id,dt) NOT ENFORCED\n) PARTITIONED BY (dt)\nWITH (\n'bucket' = '-1',\n'file.format' = 'parquet',\n'metastore.partitioned-table' = 'true'\n);").await();
        this.tEnv.executeSql("insert into students select cast(1 as decimal(20,0)) as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as dt;").await();
        List partitionedTableResult = hiveShell.executeQuery("SELECT * from students");
        this.tEnv.executeSql("create table students1\n(id decimal(20,0)\n,upload_insert TIMESTAMP\n,dt string\n,PRIMARY KEY(id,dt) NOT ENFORCED\n) PARTITIONED BY (dt)\nWITH (\n'bucket' = '-1',\n'file.format' = 'parquet'\n);").await();
        this.tEnv.executeSql("insert into students1 select cast(1 as decimal(20,0)) as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as dt;").await();
        List nonPartitionedTableResult = hiveShell.executeQuery("SELECT * from students1");
        Assertions.assertThat((List)partitionedTableResult).containsAll((Iterable)nonPartitionedTableResult);
    }

    @Test
    public void testHiveCreateAndFlinkRead() throws Exception {
        hiveShell.execute("SET hive.metastore.warehouse.dir=" + this.path);
        hiveShell.execute("CREATE TABLE hive_test_table ( a INT, b STRING ) STORED BY '" + PaimonStorageHandler.class.getName() + "'");
        hiveShell.execute("INSERT INTO hive_test_table VALUES (1, 'Apache'), (2, 'Paimon')");
        List<Row> actual = this.collect("SELECT * FROM hive_test_table");
        Assertions.assertThat(actual).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Apache"}), Row.of((Object[])new Object[]{2, "Paimon"})});
    }

    @Test
    public void testHiveCreateAndFlinkInsertRead() throws Exception {
        hiveShell.execute("SET hive.metastore.warehouse.dir=" + this.path);
        hiveShell.execute("CREATE TABLE hive_test_table ( a INT, b STRING ) STORED BY '" + PaimonStorageHandler.class.getName() + "'TBLPROPERTIES (  'primary-key'='a')");
        this.tEnv.executeSql("INSERT INTO hive_test_table VALUES (1, 'Apache'), (2, 'Paimon')");
        List<Row> actual = this.collect("SELECT * FROM hive_test_table");
        Assertions.assertThat(actual).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1, "Apache"}), Row.of((Object[])new Object[]{2, "Paimon"})});
    }

    @Test
    public void testCreateTableAs() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t VALUES(1)").await();
        this.tEnv.executeSql("CREATE TABLE t1 AS SELECT * FROM t").await();
        List<Row> result = this.collect("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment`  FROM t1$schemas s");
        Assertions.assertThat((String)result.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
        List<Row> data = this.collect("SELECT * FROM t1");
        Assertions.assertThat(data).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        this.tEnv.executeSql("CREATE TABLE t_option (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t_option VALUES(1)").await();
        this.tEnv.executeSql("CREATE TABLE t1_option WITH ('file.format' = 'parquet') AS SELECT * FROM t_option").await();
        List<Row> resultOption = this.collect("SELECT * FROM t1_option$options");
        Assertions.assertThat(resultOption).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{"file.format", "parquet"})});
        List<Row> dataOption = this.collect("SELECT * FROM t1_option");
        Assertions.assertThat(dataOption).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        this.tEnv.executeSql("CREATE TABLE t_p (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh)");
        this.tEnv.executeSql("INSERT INTO t_p  SELECT 1,2,'a','2023-02-19','12'").await();
        this.tEnv.executeSql("CREATE TABLE t1_p WITH ('partition' = 'dt') AS SELECT * FROM t_p").await();
        List<Row> resultPartition = this.collect("SELECT schema_id, fields, partition_keys, primary_keys, options, `comment`  FROM t1_p$schemas s");
        Assertions.assertThat((String)resultPartition.toString()).isEqualTo("[+I[0, [{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"},{\"id\":3,\"name\":\"dt\",\"type\":\"STRING\"},{\"id\":4,\"name\":\"hh\",\"type\":\"STRING\"}], [\"dt\"], [], {}, ]]");
        List<Row> dataPartition = this.collect("SELECT * FROM t1_p");
        Assertions.assertThat((String)dataPartition.toString()).isEqualTo("[+I[1, 2, a, 2023-02-19, 12]]");
        this.tEnv.executeSql("CREATE TABLE t_pk (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)").await();
        this.tEnv.executeSql("INSERT INTO t_pk VALUES(1,2,'aaa','2020-01-02','09')").await();
        this.tEnv.executeSql("CREATE TABLE t_pk_as WITH ('primary-key' = 'dt') AS SELECT * FROM t_pk").await();
        List<Row> resultPk = this.collect("SHOW CREATE TABLE t_pk_as");
        Assertions.assertThat((String)resultPk.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`)"});
        List<Row> dataPk = this.collect("SELECT * FROM t_pk_as");
        Assertions.assertThat((String)dataPk.toString()).isEqualTo("[+I[1, 2, aaa, 2020-01-02, 09]]");
        this.tEnv.executeSql("CREATE TABLE t_all (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n) PARTITIONED BY (dt, hh)").await();
        this.tEnv.executeSql("INSERT INTO t_all VALUES(1,2,'login','2020-01-02','09')").await();
        this.tEnv.executeSql("CREATE TABLE t_all_as WITH ('primary-key' = 'dt,hh' , 'partition' = 'dt' ) AS SELECT * FROM t_all").await();
        List<Row> resultAll = this.collect("SHOW CREATE TABLE t_all_as");
        Assertions.assertThat((String)resultAll.toString()).contains(new CharSequence[]{"PRIMARY KEY (`dt`, `hh`)"});
        Assertions.assertThat((String)resultAll.toString()).contains(new CharSequence[]{"PARTITIONED BY (`dt`)"});
        List<Row> dataAll = this.collect("SELECT * FROM t_all_as");
        Assertions.assertThat((String)dataAll.toString()).isEqualTo("[+I[1, 2, login, 2020-01-02, 09]]");
        this.tEnv.executeSql("CREATE TABLE t_pk_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING,\n    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n)").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE t_pk_not_exist_as WITH ('primary-key' = 'aaa') AS SELECT * FROM t_pk_not_exist").await()).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all primary key constraint [aaa]");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE t_pk_ddl_option (    user_id BIGINT,    item_id BIGINT,    behavior STRING,    dt STRING,    hh STRING,    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED) WITH ('primary-key' = 'dt')").await()).hasRootCauseMessage("Cannot define primary key on DDL and table options at the same time.");
        this.tEnv.executeSql("CREATE TABLE t_partition_not_exist (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh) ").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE t_partition_not_exist_as WITH ('partition' = 'aaa') AS SELECT * FROM t_partition_not_exist").await()).hasRootCauseMessage("Table column [user_id, item_id, behavior, dt, hh] should include all partition fields [aaa]");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE t_partition_ddl_option (    user_id BIGINT,    item_id BIGINT,    behavior STRING,    dt STRING,    hh STRING) PARTITIONED BY (dt, hh)  WITH ('partition' = 'dt')").await()).hasRootCauseMessage("Cannot define partition on DDL and table options at the same time.");
    }

    @Test
    public void testRenameTable() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
        this.tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
        this.tEnv.executeSql("INSERT INTO t1 SELECT 1").await();
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE t3 RENAME TO t4")).hasMessage("Table `my_hive`.`test_db`.`t3` doesn't exist or is a temporary table.");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t2")).hasMessage("Could not execute ALTER TABLE my_hive.test_db.t1 RENAME TO my_hive.test_db.t2");
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("ALTER TABLE t1 RENAME TO T1")).hasMessage("Table name [T1] cannot contain upper case in the catalog.");
        this.tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
        List tables = hiveShell.executeQuery("SHOW TABLES");
        Assertions.assertThat((boolean)tables.contains("t3")).isTrue();
        Assertions.assertThat((boolean)tables.contains("t1")).isFalse();
        List data = hiveShell.executeQuery("SELECT * FROM t3");
        Assertions.assertThat((List)data).containsExactlyInAnyOrder((Object[])new String[]{"1"});
        List<Row> tablesFromFlink = this.collect("SHOW TABLES");
        Assertions.assertThat(tablesFromFlink).contains((Object[])new Row[]{Row.of((Object[])new Object[]{"t3"})});
        Assertions.assertThat(tablesFromFlink).doesNotContain((Object[])new Row[]{Row.of((Object[])new Object[]{"t1"})});
        List<Row> dataFromFlink = this.collect("SELECT * FROM t3");
        Assertions.assertThat(dataFromFlink).contains((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
    }

    @Test
    public void testAlterTable() throws Exception {
        this.tEnv.executeSql("CREATE TABLE t1 (a INT, b STRING, c TIMESTAMP(3))").await();
        List result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[a, INT, true, null, null, null]", "+I[b, STRING, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 DROP b")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[a, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 ADD (d BIGINT, e CHAR(5))")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[a, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]", "+I[d, BIGINT, true, null, null, null]", "+I[e, CHAR(5), true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 ADD f INT AFTER a")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[a, INT, true, null, null, null]", "+I[f, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]", "+I[d, BIGINT, true, null, null, null]", "+I[e, CHAR(5), true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 MODIFY f INT AFTER e")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[a, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]", "+I[d, BIGINT, true, null, null, null]", "+I[e, CHAR(5), true, null, null, null]", "+I[f, INT, true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 RENAME a TO g")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[g, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]", "+I[d, BIGINT, true, null, null, null]", "+I[e, CHAR(5), true, null, null, null]", "+I[f, INT, true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 MODIFY d DOUBLE")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[g, INT, true, null, null, null]", "+I[c, TIMESTAMP(3), true, null, null, null]", "+I[d, DOUBLE, true, null, null, null]", "+I[e, CHAR(5), true, null, null, null]", "+I[f, INT, true, null, null, null]"});
        Assertions.assertThatCode(() -> this.tEnv.executeSql("ALTER TABLE t1 MODIFY g INT COMMENT 'test comment'")).doesNotThrowAnyException();
        result = this.collect("DESC t1").stream().map(Objects::toString).collect(Collectors.toList());
        Assertions.assertThat(result).containsExactly((Object[])new String[]{"+I[g, INT, true, null, null, null, test comment]", "+I[c, TIMESTAMP(3), true, null, null, null, null]", "+I[d, DOUBLE, true, null, null, null, null]", "+I[e, CHAR(5), true, null, null, null, null]", "+I[f, INT, true, null, null, null, null]"});
    }

    @Test
    public void testHiveLock() throws InterruptedException {
        this.tEnv.executeSql("CREATE TABLE t (a INT)");
        CatalogLock.Factory lockFactory = (CatalogLock.Factory)((FlinkCatalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog().lockFactory().get();
        AtomicInteger count = new AtomicInteger(0);
        ArrayList<Thread> threads = new ArrayList<Thread>();
        Callable<Void> unsafeIncrement = () -> {
            int nextCount = count.get() + 1;
            Thread.sleep(1L);
            count.set(nextCount);
            return null;
        };
        for (int i = 0; i < 10; ++i) {
            Thread thread = new Thread(() -> {
                CatalogLock lock = lockFactory.create();
                for (int j = 0; j < 10; ++j) {
                    try {
                        lock.runWithLock("test_db", "t", unsafeIncrement);
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            thread.start();
            threads.add(thread);
        }
        for (Thread thread : threads) {
            thread.join();
        }
        Assertions.assertThat((int)count.get()).isEqualTo(100);
    }

    @Test
    public void testUpperCase() {
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )").await()).hasRootCauseMessage(String.format("Table name [%s] cannot contain upper case in the catalog.", "T"));
        Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("CREATE TABLE t (A INT, b STRING, C STRING) WITH ( 'file.format' = 'avro')").await()).hasRootCauseMessage(String.format("Field name %s cannot contain upper case in the catalog.", "[A, C]"));
    }

    @Test
    public void testQuickPathInShowTables() throws Exception {
        this.collect("CREATE TABLE t ( a INT, b STRING )");
        List<Row> tables = this.collect("SHOW TABLES");
        Assertions.assertThat((String)tables.toString()).isEqualTo("[+I[t]]");
        new LocalFileIO().delete(new Path(this.path, "test_db.db/t"), true);
        tables = this.collect("SHOW TABLES");
        Assertions.assertThat((String)tables.toString()).isEqualTo("[]");
    }

    @Test
    public void testCatalogOptionsInheritAndOverride() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE CATALOG my_hive_options WITH (", "  'type' = 'paimon',", "  'metastore' = 'hive',", "  'uri' = '',", "  'warehouse' = '" + this.path + "',", "  'lock.enabled' = 'true',", "  'table-default.opt1' = 'value1',", "  'table-default.opt2' = 'value2',", "  'table-default.opt3' = 'value3'", ")")).await();
        this.tEnv.executeSql("USE CATALOG my_hive_options").await();
        this.tEnv.executeSql("CREATE TABLE table_without_options (a INT, b STRING)").await();
        Identifier identifier = new Identifier("default", "table_without_options");
        Catalog catalog = ((FlinkCatalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).catalog();
        Map tableOptions = catalog.getTable(identifier).options();
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt1", (Object)"value1");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt2", (Object)"value2");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt3", (Object)"value3");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"lock.enabled");
        this.tEnv.executeSql("CREATE TABLE table_with_options (a INT, b STRING) WITH ('opt1' = 'new_value')").await();
        identifier = new Identifier("default", "table_with_options");
        tableOptions = catalog.getTable(identifier).options();
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt1", (Object)"new_value");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt2", (Object)"value2");
        Assertions.assertThat((Map)tableOptions).containsEntry((Object)"opt3", (Object)"value3");
        Assertions.assertThat((Map)tableOptions).doesNotContainKey((Object)"lock.enabled");
    }

    @Test
    public void testAddPartitionsToMetastore() throws Exception {
        this.prepareTestAddPartitionsToMetastore();
        String sql1 = "select v, ptb, k from t where pta >= 2 and ptb <= '3a' and (k % 10) >= 1 and (k % 10) <= 2";
        Assertions.assertThat((List)hiveShell.executeQuery(sql1)).containsExactlyInAnyOrder((Object[])new String[]{"2001\t2a\t21", "2002\t2a\t22", "3001\t3a\t31", "3002\t3a\t32"});
        String sql2 = "select v, ptb, k from t where pta >= 2 and ptb <= '3a' and (v % 10) >= 3 and (v % 10) <= 4";
        Assertions.assertThat((List)hiveShell.executeQuery(sql2)).containsExactlyInAnyOrder((Object[])new String[]{"2003\t2a\t23", "2004\t2a\t24", "3003\t3a\t33", "3004\t3a\t34"});
    }

    @Test
    @LocationInProperties
    public void testAddPartitionsToMetastoreLocationInProperties() throws Exception {
        this.prepareTestAddPartitionsToMetastore();
        String sql1 = "select v, ptb, k from t where pta >= 2 and ptb <= '3a' and (k % 10) >= 1 and (k % 10) <= 2";
        Assertions.assertThat(this.collect(sql1).stream().map(Objects::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"+I[2001, 2a, 21]", "+I[2002, 2a, 22]", "+I[3001, 3a, 31]", "+I[3002, 3a, 32]"});
        String sql2 = "select v, ptb, k from t where pta >= 2 and ptb <= '3a' and (v % 10) >= 3 and (v % 10) <= 4";
        Assertions.assertThat(this.collect(sql2).stream().map(Objects::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"+I[2003, 2a, 23]", "+I[2004, 2a, 24]", "+I[3003, 3a, 33]", "+I[3004, 3a, 34]"});
    }

    private void prepareTestAddPartitionsToMetastore() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE TABLE t (", "    pta INT,", "    k INT,", "    ptb VARCHAR(10),", "    v BIGINT,", "    PRIMARY KEY (k, pta, ptb) NOT ENFORCED", ") PARTITIONED BY (ptb, pta) WITH (", "    'bucket' = '2',", "    'metastore.partitioned-table' = 'true'", ")"));
        ArrayList<String> values = new ArrayList<String>();
        for (int pta = 1; pta <= 3; ++pta) {
            int k = pta * 10;
            int v = pta * 1000;
            for (int ptb = 0; ptb < 2; ++ptb) {
                for (int i = 0; i < 5; ++i) {
                    values.add(String.format("(%d, %d, '%d%c', %d)", pta, k, pta, 97 + ptb, v));
                    ++k;
                    ++v;
                }
            }
        }
        this.tEnv.executeSql("INSERT INTO t VALUES " + String.join((CharSequence)", ", values)).await();
        Assertions.assertThat((List)hiveShell.executeQuery("show partitions t")).containsExactlyInAnyOrder((Object[])new String[]{"ptb=1a/pta=1", "ptb=1b/pta=1", "ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3", "ptb=3b/pta=3"});
    }

    @Test
    public void testAddPartitionsToMetastoreForUnpartitionedTable() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE TABLE t (", "    k INT,", "    v BIGINT,", "    PRIMARY KEY (k) NOT ENFORCED", ") WITH (", "    'bucket' = '2',", "    'metastore.partitioned-table' = 'true'", ")"));
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
        Assertions.assertThat((List)hiveShell.executeQuery("SELECT * FROM t ORDER BY k")).containsExactlyInAnyOrder((Object[])new String[]{"1\t10", "2\t20"});
    }

    @Test
    public void testAddPartitionsForTag() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE TABLE t (", "    k INT,", "    v BIGINT,", "    PRIMARY KEY (k) NOT ENFORCED", ") WITH (", "    'bucket' = '2',", "    'metastore.tag-to-partition' = 'dt'", ")"));
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
        Table table = ((DataCatalogTable)((org.apache.flink.table.catalog.Catalog)this.tEnv.getCatalog(this.tEnv.getCurrentCatalog()).get()).getTable(new ObjectPath(this.tEnv.getCurrentDatabase(), "t"))).table();
        table.createTag("2023-10-16", 1L);
        Assertions.assertThat((List)hiveShell.executeQuery("SHOW PARTITIONS t")).containsExactlyInAnyOrder((Object[])new String[]{"dt=2023-10-16"});
        Assertions.assertThat((List)hiveShell.executeQuery("SELECT k, v FROM t WHERE dt='2023-10-16'")).containsExactlyInAnyOrder((Object[])new String[]{"1\t10", "2\t20"});
        Assertions.assertThat((List)hiveShell.executeQuery("SELECT * FROM t WHERE dt='2023-10-16'")).containsExactlyInAnyOrder((Object[])new String[]{"1\t10\t2023-10-16", "2\t20\t2023-10-16"});
        this.tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
        table.createTag("2023-10-17", 2L);
        Assertions.assertThat((List)hiveShell.executeQuery("SELECT * FROM t")).containsExactlyInAnyOrder((Object[])new String[]{"1\t10\t2023-10-16", "2\t20\t2023-10-16", "1\t10\t2023-10-17", "2\t20\t2023-10-17", "3\t30\t2023-10-17", "4\t40\t2023-10-17"});
    }

    @Test
    public void testAddPartitionsForTagPreview() throws Exception {
        this.tEnv.executeSql(String.join((CharSequence)"\n", "CREATE TABLE t (", "    k INT,", "    v BIGINT,", "    PRIMARY KEY (k) NOT ENFORCED", ") WITH (", "    'bucket' = '2',", "    'metastore.tag-to-partition' = 'dt',", "    'metastore.tag-to-partition.preview' = 'process-time'", ")"));
        this.tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await();
        List result = hiveShell.executeQuery("SHOW PARTITIONS t");
        Assertions.assertThat((List)result).hasSize(1);
        String tag = ((String)result.get(0)).split("=")[1];
        Assertions.assertThat((List)hiveShell.executeQuery(String.format("SELECT k, v FROM t WHERE dt='%s'", tag))).containsExactlyInAnyOrder((Object[])new String[]{"1\t10", "2\t20"});
        this.tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await();
        if (hiveShell.executeQuery("SHOW PARTITIONS t").size() == 1) {
            Assertions.assertThat((List)hiveShell.executeQuery(String.format("SELECT k, v FROM t WHERE dt='%s'", tag))).containsExactlyInAnyOrder((Object[])new String[]{"1\t10", "2\t20", "3\t30", "4\t40"});
        }
    }

    protected List<Row> collect(String sql) throws Exception {
        ArrayList<Row> result = new ArrayList<Row>();
        try (CloseableIterator it = this.tEnv.executeSql(sql).collect();){
            while (it.hasNext()) {
                result.add((Row)it.next());
            }
        }
        return result;
    }

    @Target(value={ElementType.METHOD})
    @Retention(value=RetentionPolicy.RUNTIME)
    private static @interface LocationInProperties {
    }
}

