/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class PostponeBucketTableITCase
extends AbstractTestBase {
    private static final int TIMEOUT = 120;

    @Test
    public void testWriteThenCompact() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  k INT,\n  v INT,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2'\n)");
        int numPartitions = 3;
        int numKeys = 100;
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < numPartitions; ++i) {
            for (int j = 0; j < numKeys; ++j) {
                values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + j));
            }
        }
        ThreadLocalRandom random = ThreadLocalRandom.current();
        if (random.nextBoolean()) {
            tEnv.executeSql("INSERT INTO T VALUES " + String.join((CharSequence)", ", values)).await();
        } else {
            tEnv.executeSql("INSERT INTO T /*+ OPTIONS('partition.sink-strategy'='hash') */ VALUES " + String.join((CharSequence)", ", values)).await();
        }
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < numPartitions; ++i) {
            expected.add(String.format("+I[%d, %d]", i, (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2));
        }
        String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
        values.clear();
        int changedPartition = 1;
        for (int j = 0; j < numKeys; ++j) {
            values.add(String.format("(%d, %d, %d)", changedPartition, j, -(changedPartition * numKeys + j)));
        }
        tEnv.executeSql("INSERT INTO T VALUES " + String.join((CharSequence)", ", values)).await();
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        expected.clear();
        for (int i = 0; i < numPartitions; ++i) {
            int val = (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2;
            if (i == changedPartition) {
                val *= -1;
            }
            expected.add(String.format("+I[%d, %d]", i, val));
        }
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expected);
    }

    @Test
    public void testOverwrite() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  k INT,\n  v INT,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2'\n)");
        tEnv.executeSql("INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2, 10, 210), (2, 20, 220)").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", "+I[20, 220, 2]"});
        tEnv.executeSql("INSERT INTO T VALUES (2, 40, 240)").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", "+I[20, 220, 2]"});
        tEnv.executeSql("INSERT OVERWRITE T VALUES (2, 20, 221), (2, 30, 230)").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]"});
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[20, 221, 2]", "+I[30, 230, 2]"});
    }

    @Timeout(value=120L)
    @Test
    public void testLookupChangelogProducer() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        String createCatalogSql = "CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)";
        bEnv.executeSql(createCatalogSql);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  k INT,\n  v INT,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2',\n  'changelog-producer' = 'lookup'\n)");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(1).checkpointIntervalMs(1000).build();
        sEnv.executeSql(createCatalogSql);
        sEnv.executeSql("USE CATALOG mycat");
        TableResult streamingSelect = sEnv.executeSql("SELECT k, v, pt FROM T");
        JobClient client = (JobClient)streamingSelect.getJobClient().get();
        CloseableIterator it = streamingSelect.collect();
        bEnv.executeSql("INSERT INTO T VALUES (1, 10, 110), (1, 20, 120), (2, 10, 210), (2, 20, 220)").await();
        Assertions.assertThat(this.collect(bEnv.executeSql("SELECT * FROM T"))).isEmpty();
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(bEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", "+I[20, 220, 2]"});
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 4)).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 120, 1]", "+I[10, 210, 2]", "+I[20, 220, 2]"});
        bEnv.executeSql("INSERT INTO T VALUES (1, 20, 121), (2, 30, 230)").await();
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(bEnv.executeSql("SELECT k, v, pt FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[10, 110, 1]", "+I[20, 121, 1]", "+I[10, 210, 2]", "+I[20, 220, 2]", "+I[30, 230, 2]"});
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 3)).containsExactlyInAnyOrder((Object[])new String[]{"-U[20, 120, 1]", "+U[20, 121, 1]", "+I[30, 230, 2]"});
        it.close();
    }

    @Test
    public void testRescaleBucket() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  k INT,\n  v INT,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2',\n  'postpone.default-bucket-num' = '2'\n)");
        int numKeys = 100;
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < 3; ++i) {
            for (int j = 0; j < numKeys; ++j) {
                values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + j));
            }
        }
        tEnv.executeSql("INSERT INTO T VALUES " + String.join((CharSequence)", ", values)).await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        ArrayList<String> expectedBuckets = new ArrayList<String>();
        for (int i = 0; i < 3; ++i) {
            expectedBuckets.add(String.format("+I[{%d}, 2]", i));
        }
        String bucketSql = "SELECT `partition`, COUNT(DISTINCT bucket) FROM `T$files` GROUP BY `partition`";
        Assertions.assertThat(this.collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
        ArrayList<String> expectedData = new ArrayList<String>();
        for (int i = 0; i < 3; ++i) {
            expectedData.add(String.format("+I[%d, %d]", i, (i * numKeys + i * numKeys + numKeys - 1) * numKeys / 2));
        }
        String query = "SELECT pt, SUM(v) FROM T GROUP BY pt";
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
        values.clear();
        for (int j = 0; j < numKeys; ++j) {
            values.add(String.format("(1, %d, 0)", j));
            values.add(String.format("(2, %d, 1)", j));
        }
        tEnv.executeSql("INSERT INTO T VALUES " + String.join((CharSequence)", ", values)).await();
        tEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 4, `partition` => 'pt=1')");
        tEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 8, `partition` => 'pt=2')");
        expectedBuckets.clear();
        expectedBuckets.add("+I[{0}, 2]");
        expectedBuckets.add("+I[{1}, 4]");
        expectedBuckets.add("+I[{2}, 8]");
        Assertions.assertThat(this.collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql(bucketSql))).hasSameElementsAs(expectedBuckets);
        expectedData.clear();
        expectedData.add(String.format("+I[0, %d]", (numKeys - 1) * numKeys / 2));
        expectedData.add("+I[1, 0]");
        expectedData.add(String.format("+I[2, %d]", numKeys));
        Assertions.assertThat(this.collect(tEnv.executeSql(query))).hasSameElementsAs(expectedData);
    }

    @Timeout(value=120L)
    @Test
    public void testInputChangelogProducer() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(1).checkpointIntervalMs(500).build();
        String createCatalog = "CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)";
        sEnv.executeSql(createCatalog);
        sEnv.executeSql("USE CATALOG mycat");
        sEnv.executeSql("CREATE TEMPORARY TABLE S (\n  i INT\n) WITH (\n  'connector' = 'datagen',\n  'fields.i.kind' = 'sequence',\n  'fields.i.start' = '0',\n  'fields.i.end' = '199',\n  'number-of-rows' = '200',\n  'rows-per-second' = '50'\n)");
        sEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v INT,\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2',\n  'changelog-producer' = 'input',\n  'continuous.discovery-interval' = '1ms'\n)");
        sEnv.executeSql("CREATE TEMPORARY VIEW V AS SELECT MOD(i, 2) AS x, IF(MOD(i, 2) = 0, 1, 1000) AS y FROM S");
        sEnv.executeSql("INSERT INTO T SELECT SUM(y), x FROM V GROUP BY x").await();
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().parallelism(2).setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        bEnv.executeSql(createCatalog);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(bEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+U[100, 0]", "+U[100000, 1]"});
        TableResult streamingSelect = sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id' = '1') */");
        JobClient client = (JobClient)streamingSelect.getJobClient().get();
        CloseableIterator it = streamingSelect.collect();
        this.collect(client, (CloseableIterator<Row>)it, 398);
    }

    @Test
    public void testPostponeWriteNotExpireSnapshots() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  pt INT,\n  k INT,\n  v INT,\n  PRIMARY KEY (pt, k) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2',\n  'snapshot.num-retained.min' = '3',\n  'snapshot.num-retained.max' = '3'\n)");
        for (int i = 0; i < 5; ++i) {
            tEnv.executeSql(String.format("INSERT INTO T VALUES (%d, 0, 0)", i)).await();
        }
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT COUNT(*) FROM `T$snapshots`"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[5]"});
    }

    @Timeout(value=120L)
    @Test
    public void testLookupPostponeBucketTable() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        String createCatalogSql = "CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)";
        bEnv.executeSql(createCatalogSql);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v INT,\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2'\n)");
        bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(1).checkpointIntervalMs(200).build();
        sEnv.executeSql(createCatalogSql);
        sEnv.executeSql("USE CATALOG mycat");
        TableResult streamingSelect = sEnv.executeSql("SELECT i, v FROM SRC LEFT JOIN T FOR SYSTEM_TIME AS OF SRC.proctime AS D ON SRC.i = D.k");
        JobClient client = (JobClient)streamingSelect.getJobClient().get();
        CloseableIterator it = streamingSelect.collect();
        bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30)").await();
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 3)).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10]", "+I[2, 20]", "+I[3, 30]"});
        bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 5)").await();
        bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 3)).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10]", "+I[2, 20]", "+I[3, 30]"});
        it.close();
    }

    @Timeout(value=120L)
    @Test
    public void testLookupPostponeBucketPartitionedTable() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        String createCatalogSql = "CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)";
        bEnv.executeSql(createCatalogSql);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("CREATE TABLE T (\n  k INT,\n  pt INT,\n  v INT,\n  PRIMARY KEY (k, pt) NOT ENFORCED\n) PARTITIONED BY (pt) WITH (\n  'bucket' = '-2'\n)");
        bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS PROCTIME())");
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(1).checkpointIntervalMs(200).build();
        sEnv.executeSql(createCatalogSql);
        sEnv.executeSql("USE CATALOG mycat");
        TableResult streamingSelect = sEnv.executeSql("SELECT i, D.pt, v FROM SRC LEFT JOIN T FOR SYSTEM_TIME AS OF SRC.proctime AS D ON SRC.i = D.k AND SRC.pt = D.pt");
        JobClient client = (JobClient)streamingSelect.getJobClient().get();
        CloseableIterator it = streamingSelect.collect();
        bEnv.executeSql("INSERT INTO T VALUES (1, 1, 10), (2, 2, 20), (3, 2, 30)").await();
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 5, `partition` => 'pt=1')").await();
        bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` => 8, `partition` => 'pt=2')").await();
        bEnv.executeSql("INSERT INTO SRC VALUES (1, 1), (2, 2), (3, 2)").await();
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 3)).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 1, 10]", "+I[2, 2, 20]", "+I[3, 2, 30]"});
        it.close();
    }

    @Test
    public void testDeletionVector() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v INT,\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2',\n  'deletion-vectors.enabled' = 'true'\n)");
        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 30), (4, 40)").await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]"});
        tEnv.executeSql("INSERT INTO T VALUES (1, 11), (5, 51)").await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 11]", "+I[2, 20]", "+I[3, 30]", "+I[4, 40]", "+I[5, 51]"});
        tEnv.executeSql("INSERT INTO T VALUES (2, 52), (3, 32)").await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 11]", "+I[2, 52]", "+I[3, 32]", "+I[4, 40]", "+I[5, 51]"});
    }

    @Test
    public void testSameKeyPreserveOrder() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v INT,\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2'\n)");
        tEnv.executeSql("INSERT INTO T /*+ OPTIONS('sink.parallelism' = '2') */ VALUES (1, 10), (1, 20), (1, 30), (1, 40)").await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 40]"});
    }

    @Test
    public void testAvroUnsupportedTypes() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment tEnv = this.tableEnvironmentBuilder().batchMode().setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        tEnv.executeSql("CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)");
        tEnv.executeSql("USE CATALOG mycat");
        tEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v TIMESTAMP(9),\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2'\n)");
        tEnv.executeSql("INSERT INTO T VALUES (1, TIMESTAMP '2025-06-11 16:35:45.123456789'), (2, CAST(NULL AS TIMESTAMP(9)))").await();
        tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(tEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 2025-06-11T16:35:45.123456789]", "+I[2, null]"});
    }

    @Timeout(value=120L)
    @Test
    public void testNoneChangelogProducer() throws Exception {
        String warehouse = this.getTempDirPath();
        TableEnvironment sEnv = this.tableEnvironmentBuilder().streamingMode().parallelism(1).checkpointIntervalMs(500).build();
        String createCatalog = "CREATE CATALOG mycat WITH (\n  'type' = 'paimon',\n  'warehouse' = '" + warehouse + "'\n)";
        sEnv.executeSql(createCatalog);
        sEnv.executeSql("USE CATALOG mycat");
        sEnv.executeSql("CREATE TABLE T (\n  k INT,\n  v INT,\n  PRIMARY KEY (k) NOT ENFORCED\n) WITH (\n  'bucket' = '-2',\n  'changelog-producer' = 'none',\n  'scan.remove-normalize' = 'true',\n  'continuous.discovery-interval' = '1ms'\n)");
        TableEnvironment bEnv = this.tableEnvironmentBuilder().batchMode().parallelism(1).setConf(TableConfigOptions.TABLE_DML_SYNC, true).build();
        bEnv.executeSql(createCatalog);
        bEnv.executeSql("USE CATALOG mycat");
        bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (1, 100)").await();
        bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await();
        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
        Assertions.assertThat(this.collect(bEnv.executeSql("SELECT * FROM T"))).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 101]", "+I[2, 20]", "+I[3, 31]"});
        TableResult streamingSelect = sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id' = '1') */");
        JobClient client = (JobClient)streamingSelect.getJobClient().get();
        CloseableIterator it = streamingSelect.collect();
        Assertions.assertThat(this.collect(client, (CloseableIterator<Row>)it, 5)).containsExactlyInAnyOrder((Object[])new String[]{"+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]", "+I[3, 31]"});
    }

    private List<String> collect(TableResult result) throws Exception {
        ArrayList<String> ret = new ArrayList<String>();
        try (CloseableIterator it = result.collect();){
            while (it.hasNext()) {
                ret.add(((Row)it.next()).toString());
            }
        }
        return ret;
    }

    private List<String> collect(JobClient client, CloseableIterator<Row> it, int limit) throws Exception {
        AtomicBoolean shouldStop = new AtomicBoolean(false);
        Thread timerThread = new Thread(() -> {
            try {
                for (int i = 0; i < 120; ++i) {
                    Thread.sleep(1000L);
                    if (!shouldStop.get()) continue;
                    return;
                }
                client.cancel().get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        timerThread.start();
        ArrayList<String> ret = new ArrayList<String>();
        for (int i = 0; i < limit && it.hasNext(); ++i) {
            ret.add(((Row)it.next()).toString());
        }
        shouldStop.set(true);
        timerThread.join();
        return ret;
    }
}

