/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.iceberg;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public abstract class FlinkIcebergITCaseBase
extends AbstractTestBase {
    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testPrimaryKeyTable(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  pt INT,\n  k INT,\n  v1 INT,\n  v2 STRING,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'full-compaction.delta-commits' = '1',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 100, 'apple'), (1, 11, 110, 'banana'), (2, 20, 200, 'cat'), (2, 21, 210, 'dog')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{100, 10, "apple", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{200, 20, "cat", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2})});
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 101, 'red'), (1, 12, 121, 'green'), (2, 20, 201, 'blue'), (2, 22, 221, 'yellow')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{101, 10, "red", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{121, 12, "green", 1}), Row.of((Object[])new Object[]{201, 20, "blue", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2}), Row.of((Object[])new Object[]{221, 22, "yellow", 2})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testAppendOnlyTable(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.cities (\n  country STRING,\n  name STRING\n) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin'), ('usa', 'chicago'), ('germany', 'hamburg')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, country FROM iceberg.`default`.cities"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"new york", "usa"}), Row.of((Object[])new Object[]{"chicago", "usa"}), Row.of((Object[])new Object[]{"berlin", "germany"}), Row.of((Object[])new Object[]{"hamburg", "germany"})});
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name FROM iceberg.`default`.cities WHERE country = 'germany'"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"berlin"}), Row.of((Object[])new Object[]{"hamburg"}), Row.of((Object[])new Object[]{"munich"})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testFilterAllTypes(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  pt INT,\n  id INT,  v_int INT,\n  v_boolean BOOLEAN,\n  v_bigint BIGINT,\n  v_float FLOAT,\n  v_double DOUBLE,\n  v_decimal DECIMAL(8, 3),\n  v_varchar STRING,\n  v_varbinary VARBINARY(20),\n  v_date DATE,\n  v_timestamp TIMESTAMP(6)\n) PARTITIONED BY (pt) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 1, 1, true, 10, CAST(100.0 AS FLOAT), 1000.0, 123.456, 'cat', CAST('B_cat' AS VARBINARY(20)), DATE '2024-10-10', TIMESTAMP '2024-10-10 11:22:33.123456'), (2, 2, 2, false, 20, CAST(200.0 AS FLOAT), 2000.0, 234.567, 'dog', CAST('B_dog' AS VARBINARY(20)), DATE '2024-10-20', TIMESTAMP '2024-10-20 11:22:33.123456'), (3, 3, CAST(NULL AS INT), CAST(NULL AS BOOLEAN), CAST(NULL AS BIGINT), CAST(NULL AS FLOAT), CAST(NULL AS DOUBLE), CAST(NULL AS DECIMAL(8, 3)), CAST(NULL AS STRING), CAST(NULL AS VARBINARY(20)), CAST(NULL AS DATE), CAST(NULL AS TIMESTAMP(6)))").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        tEnv.executeSql("USE CATALOG iceberg");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where pt = 1"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_int = 1"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_boolean = true"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_bigint = 10"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_float = 100.0"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_double = 1000.0"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_decimal = 123.456"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_varchar = 'cat'"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_date = '2024-10-10'"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_timestamp = TIMESTAMP '2024-10-10 11:22:33.123456'"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_int IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_boolean IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_bigint IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_float IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_double IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_decimal IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_varchar IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_varbinary IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_date IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT id FROM T where v_timestamp IS NULL"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{3})});
    }

    @ParameterizedTest
    @ValueSource(strings={"parquet"})
    public void testFilterTimestampLtz(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  id INT,  v_timestampltz TIMESTAMP_LTZ(6)\n) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, CAST(TO_TIMESTAMP_LTZ(1100000000321, 3) AS TIMESTAMP_LTZ(6))), (2, CAST(TO_TIMESTAMP_LTZ(1200000000321, 3) AS TIMESTAMP_LTZ(6))), (3, CAST(NULL AS TIMESTAMP_LTZ(6)))").await();
        HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), warehouse + "/iceberg");
        TableIdentifier icebergIdentifier = TableIdentifier.of((String[])new String[]{"default", "T"});
        Table icebergTable = icebergCatalog.loadTable(icebergIdentifier);
        CloseableIterable result = IcebergGenerics.read((Table)icebergTable).where((Expression)Expressions.equal((String)"v_timestampltz", (Object)1100000000321000L)).build();
        ArrayList<Object> actual = new ArrayList<Object>();
        for (Record record : result) {
            actual.add(record.get(0));
        }
        result.close();
        Assertions.assertThat(actual).containsExactly(new Object[]{1});
        result = IcebergGenerics.read((Table)icebergTable).where((Expression)Expressions.isNull((String)"v_timestampltz")).build();
        actual = new ArrayList();
        for (Record record : result) {
            actual.add(record.get(0));
        }
        result.close();
        Assertions.assertThat(actual).containsExactly(new Object[]{3});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testDropAndRecreateTable(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        String createTableDdl = "CREATE TABLE paimon.`default`.cities (\n  country STRING,\n  name STRING\n) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'file.format' = '" + format + "'\n)";
        tEnv.executeSql(createTableDdl);
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'new york'), ('germany', 'berlin')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, country FROM iceberg.`default`.cities"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"new york", "usa"}), Row.of((Object[])new Object[]{"berlin", "germany"})});
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'chicago'), ('germany', 'hamburg')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, country FROM iceberg.`default`.cities"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"new york", "usa"}), Row.of((Object[])new Object[]{"chicago", "usa"}), Row.of((Object[])new Object[]{"berlin", "germany"}), Row.of((Object[])new Object[]{"hamburg", "germany"})});
        tEnv.executeSql("DROP TABLE paimon.`default`.cities");
        tEnv.executeSql(createTableDdl);
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'houston'), ('germany', 'munich')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, country FROM iceberg.`default`.cities"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"houston", "usa"}), Row.of((Object[])new Object[]{"munich", "germany"})});
        tEnv.executeSql("INSERT INTO paimon.`default`.cities VALUES ('usa', 'san francisco'), ('germany', 'cologne')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name FROM iceberg.`default`.cities WHERE country = 'germany'"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"munich"}), Row.of((Object[])new Object[]{"cologne"})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testNestedTypes(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  k INT,\n  v MAP<INT, ARRAY<ROW(f1 STRING, f2 INT)>>,\n  v2 BIGINT\n) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, MAP[10, ARRAY[ROW('apple', 100), ROW('banana', 101)], 20, ARRAY[ROW('cat', 102), ROW('dog', 103)]], 1000), (2, MAP[10, ARRAY[ROW('cherry', 200), ROW('pear', 201)], 20, ARRAY[ROW('tiger', 202), ROW('wolf', 203)]], 2000)").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v[10], v2 FROM iceberg.`default`.T"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, new Row[]{Row.of((Object[])new Object[]{"apple", 100}), Row.of((Object[])new Object[]{"banana", 101})}, 1000L}), Row.of((Object[])new Object[]{2, new Row[]{Row.of((Object[])new Object[]{"cherry", 200}), Row.of((Object[])new Object[]{"pear", 201})}, 2000L})});
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (3, MAP[10, ARRAY[ROW('mango', 300), ROW('watermelon', 301)], 20, ARRAY[ROW('rabbit', 302), ROW('lion', 303)]], 3000)").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v[10][2].f1, v2 FROM iceberg.`default`.T WHERE v[20][1].f2 > 200"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{2, "pear", 2000L}), Row.of((Object[])new Object[]{3, "watermelon", 3000L})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testCreateTags(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  pt INT,\n  k INT,\n  v1 INT,\n  v2 STRING,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'full-compaction.delta-commits' = '1',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 100, 'apple'), (1, 11, 110, 'banana'), (2, 20, 200, 'cat'), (2, 21, 210, 'dog')").await();
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 101, 'red'), (1, 12, 121, 'green'), (2, 20, 201, 'blue'), (2, 22, 221, 'yellow')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{100, 10, "apple", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{200, 20, "cat", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2})});
        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T /*+ OPTIONS('tag'='tag2') */ ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{101, 10, "red", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{121, 12, "green", 1}), Row.of((Object[])new Object[]{201, 20, "blue", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2}), Row.of((Object[])new Object[]{221, 22, "yellow", 2})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, type, snapshot_id FROM iceberg.`default`.T$refs"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", "TAG", 1L}), Row.of((Object[])new Object[]{"tag2", "TAG", 4L})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testDeleteTags(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  pt INT,\n  k INT,\n  v1 INT,\n  v2 STRING,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'full-compaction.delta-commits' = '1',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 100, 'apple'), (1, 11, 110, 'banana'), (2, 20, 200, 'cat'), (2, 21, 210, 'dog')").await();
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 101, 'red'), (1, 12, 121, 'green'), (2, 20, 201, 'blue'), (2, 22, 221, 'yellow')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag2', 4)");
        tEnv.executeSql("CALL paimon.sys.delete_tag('default.T', 'tag2')");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{100, 10, "apple", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{200, 20, "cat", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, type, snapshot_id FROM iceberg.`default`.T$refs"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", "TAG", 1L})});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "parquet", "avro"})
    public void testReplaceTags(String format) throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).build();
        tEnv.executeSql("CREATE CATALOG paimon WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("CREATE TABLE paimon.`default`.T (\n  pt INT,\n  k INT,\n  v1 INT,\n  v2 STRING,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'metadata.iceberg.storage' = 'hadoop-catalog',\n  'full-compaction.delta-commits' = '1',\n  'file.format' = '" + format + "'\n)");
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 100, 'apple'), (1, 11, 110, 'banana'), (2, 20, 200, 'cat'), (2, 21, 210, 'dog')").await();
        tEnv.executeSql("INSERT INTO paimon.`default`.T VALUES (1, 10, 101, 'red'), (1, 12, 121, 'green'), (2, 20, 201, 'blue'), (2, 22, 221, 'yellow')").await();
        tEnv.executeSql("CREATE CATALOG iceberg WITH (\n  'type' = 'iceberg',\n  'catalog-type' = 'hadoop',\n  'warehouse' = '" + warehouse + "/iceberg',\n  'cache-enabled' = 'false'\n)");
        tEnv.executeSql("CALL paimon.sys.create_tag('default.T', 'tag1', 1)");
        tEnv.executeSql("CALL paimon.sys.replace_tag('default.T', 'tag1', 4, '1d')");
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT name, type, snapshot_id FROM iceberg.`default`.T$refs"))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"tag1", "TAG", 4L})});
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM iceberg.`default`.T /*+ OPTIONS('tag'='tag1') */ ORDER BY pt, k"))).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{101, 10, "red", 1}), Row.of((Object[])new Object[]{110, 11, "banana", 1}), Row.of((Object[])new Object[]{121, 12, "green", 1}), Row.of((Object[])new Object[]{201, 20, "blue", 2}), Row.of((Object[])new Object[]{210, 21, "dog", 2}), Row.of((Object[])new Object[]{221, 22, "yellow", 2})});
    }

    private List<Row> collect(TableResult result) throws Exception {
        ArrayList<Row> rows = new ArrayList<Row>();
        try (CloseableIterator it = result.collect();){
            while (it.hasNext()) {
                rows.add((Row)it.next());
            }
        }
        return rows;
    }
}

