/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class BatchFileStoreITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)");
    }

    @Test
    public void testAQEWithWriteManifest() {
        this.batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        this.batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
    }

    @Test
    public void testAQEWithDynamicBucket() {
        this.batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT ENFORCED, b INT, c INT)", new Object[0]);
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        this.batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM D_T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
    }

    @Test
    public void testOverwriteEmpty() {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        this.batchSql("INSERT OVERWRITE T SELECT * FROM T WHERE 1 <> 1", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).isEmpty();
    }

    @Test
    public void testTimeTravelRead() throws Exception {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        long time1 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        long time2 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)", new Object[0]);
        long time3 = System.currentTimeMillis();
        Thread.sleep(10L);
        this.batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)", new Object[0]);
        this.paimonTable("T").createTag("tag2", 2L);
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='0') */", new Object[0])).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(SnapshotNotExistException.class, (String)"Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4.")});
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='0') */", new Object[0])).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(SnapshotNotExistException.class, (String)"Specified parameter scan.snapshot-id = 0 is not exist, you can set it in range from 1 to 4.")});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time1), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", time1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666}), Row.of((Object[])new Object[]{7, 77, 777}), Row.of((Object[])new Object[]{8, 88, 888})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time2), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", time2), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666}), Row.of((Object[])new Object[]{7, 77, 777}), Row.of((Object[])new Object[]{8, 88, 888})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='3') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s') */", time3), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444}), Row.of((Object[])new Object[]{5, 55, 555}), Row.of((Object[])new Object[]{6, 66, 666})});
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.file-creation-time-millis'='%s') */", time3), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{7, 77, 777}), Row.of((Object[])new Object[]{8, 88, 888})});
        Assertions.assertThatThrownBy(() -> this.batchSql(String.format("SELECT * FROM T /*+ OPTIONS('scan.timestamp-millis'='%s', 'scan.snapshot-id'='1') */", time3), new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("[scan.snapshot-id] must be null when you set [scan.timestamp-millis,scan.timestamp]");
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.mode'='full', 'scan.snapshot-id'='1') */", new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("%s must be null when you use latest-full for scan.mode", new Object[]{CoreOptions.SCAN_SNAPSHOT_ID.key()});
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, 44, 444})});
        Assertions.assertThatThrownBy(() -> this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */", new Object[0])).hasRootCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Tag 'unknown' doesn't exist.");
    }

    @Test
    @Timeout(value=120L)
    public void testTimeTravelReadWithWatermark() throws Exception {
        List watermarks;
        this.streamSqlIter("CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING, dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')", new Object[0]);
        this.sql("CREATE TABLE WT (a STRING, b STRING, c STRING, dt TIMESTAMP, PRIMARY KEY (a) NOT ENFORCED)", new Object[0]);
        CloseableIterator<Row> insert = this.streamSqlIter("INSERT INTO WT SELECT * FROM gen ", new Object[0]);
        while (true) {
            if ((watermarks = this.sql("SELECT `watermark` FROM WT$snapshots", new Object[0]).stream().map(r -> (Long)r.getField("watermark")).collect(Collectors.toList())).size() > 1) break;
            Thread.sleep(1000L);
        }
        insert.close();
        Long maxWatermark = (Long)watermarks.get(watermarks.size() - 1);
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM WT /*+ OPTIONS('scan.watermark'='%d') */", maxWatermark), new Object[0])).isNotEmpty();
    }

    @Test
    public void testTimeTravelReadWithSnapshotExpiration() throws Exception {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        this.paimonTable("T").createTag("tag1", 1L);
        this.batchSql("INSERT INTO T VALUES (3, 33, 333), (4, 44, 444)", new Object[0]);
        HashMap<String, String> expireOptions = new HashMap<String, String>();
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        FileStoreTable table = this.paimonTable("T");
        table.copy(expireOptions).newCommit("").expireSnapshots();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(this.batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag1') */", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
    }

    @Test
    public void testIncrementBetweenReadWithSnapshotExpiration() throws Exception {
        String tableName = "T";
        this.batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111)", tableName), new Object[0]);
        this.paimonTable(tableName).createTag("tag1", 1L);
        this.batchSql(String.format("INSERT INTO %s VALUES (2, 22, 222)", tableName), new Object[0]);
        this.paimonTable(tableName).createTag("tag2", 2L);
        this.batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", tableName), new Object[0]);
        this.paimonTable(tableName).createTag("tag3", 3L);
        HashMap<String, String> expireOptions = new HashMap<String, String>();
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        FileStoreTable table = this.paimonTable(tableName);
        table.copy(expireOptions).newCommit("").expireSnapshots();
        Assertions.assertThat((long)table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(this.batchSql(String.format("SELECT * FROM %s /*+ OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' = 'true') */", tableName), new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{2, 22, 222})});
    }

    @Test
    public void testSortSpillMerge() {
        this.sql("CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '1')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '2')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '3')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '4')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '5')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '6')", new Object[0]);
        this.sql("INSERT INTO KT VALUES (1, '7')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM KT", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "7"})});
        Assertions.assertThat(this.sql("SELECT b FROM KT", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7"})});
    }

    @Test
    public void testTruncateTable() {
        this.batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)", new Object[0]);
        Assertions.assertThat(this.batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222})});
        List<Row> truncateResult = this.batchSql("TRUNCATE TABLE T", new Object[0]);
        Assertions.assertThat((int)truncateResult.size()).isEqualTo(1);
        Assertions.assertThat((Object)truncateResult.get(0)).isEqualTo((Object)Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{"OK"}));
        Assertions.assertThat((boolean)this.batchSql("SELECT * FROM T", new Object[0]).isEmpty()).isTrue();
    }

    @Test
    public void testDynamicPartitionPruning() {
        this.sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        this.sql("CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        String joinSql = "SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1 ORDER BY a";
        String joinSqlSwapFactDim = "SELECT a, b, c, p, x, y FROM dim INNER JOIN fact ON x = p and z = 1 ORDER BY a";
        String expectedResult = "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b]]";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.tEnv.explainSql(joinSqlSwapFactDim, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo(expectedResult);
        Assertions.assertThat((String)this.sql(joinSqlSwapFactDim, new Object[0]).toString()).isEqualTo(expectedResult);
    }

    @Test
    public void testDynamicPartitionPruningOnTwoFactTables() {
        this.sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        this.sql("CREATE TABLE fact1 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact1 PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        this.sql("CREATE TABLE fact2 (a INT, b BIGINT, c STRING, p INT, PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 1) VALUES (40, 100, 'aaa'), (41, 101, 'bbb'), (42, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 2) VALUES (50, 200, 'aaa'), (51, 201, 'bbb'), (52, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact2 PARTITION (p = 3) VALUES (60, 300, 'aaa'), (61, 301, 'bbb'), (62, 302, 'ccc')", new Object[0]);
        String joinSql = "SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 1)\nORDER BY a";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).containsOnlyOnce((CharSequence)"DynamicFilteringDataCollector(fields=[x])(reuse_id=");
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo("[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[40, 100, aaa, 1, 1, a], +I[41, 101, bbb, 1, 1, a], +I[42, 102, ccc, 1, 1, a], +I[50, 200, aaa, 2, 2, b], +I[51, 201, bbb, 2, 2, b], +I[52, 202, ccc, 2, 2, b]]");
        joinSql = "SELECT * FROM (\nSELECT a, b, c, p, x, y FROM fact1 INNER JOIN dim ON x = p AND z = 1\nUNION ALL\nSELECT a, b, c, p, x, y FROM fact2 INNER JOIN dim ON x = p AND z = 2)\nORDER BY a";
        String expected2 = "[+I[10, 100, aaa, 1, 1, a], +I[11, 101, bbb, 1, 1, a], +I[12, 102, ccc, 1, 1, a], +I[20, 200, aaa, 2, 2, b], +I[21, 201, bbb, 2, 2, b], +I[22, 202, ccc, 2, 2, b], +I[60, 300, aaa, 3, 3, c], +I[61, 301, bbb, 3, 3, c], +I[62, 302, ccc, 3, 3, c]]";
        Assertions.assertThat((String)this.tEnv.explainSql(joinSql, new ExplainDetail[0])).contains(new CharSequence[]{"DynamicFilteringDataCollector"});
        Assertions.assertThat((String)this.sql(joinSql, new Object[0]).toString()).isEqualTo(expected2);
    }

    @Test
    public void testRowKindField() {
        this.sql("CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf STRING) WITH ('rowkind.field'='rf')", new Object[0]);
        this.sql("INSERT INTO R_T VALUES (1, 1, '+I')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM R_T", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, "+I"})});
        this.sql("INSERT INTO R_T VALUES (1, 2, '-D')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM R_T", new Object[0])).isEmpty();
    }

    @Test
    public void testIgnoreDelete() {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1')", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A"})});
        this.sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A"})});
        this.sql("INSERT INTO ignore_delete VALUES (1, 'B')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "B"})});
    }

    @Test
    public void testIgnoreDeleteWithRowKindField() {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A", "+I"})});
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "A", "+I"})});
        this.sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM ignore_delete", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{1, "B", "+I"})});
    }

    @Test
    public void testDeleteWithPkLookup() throws Exception {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('changelog-producer' = 'lookup', 'bucket' = '1')", new Object[0]);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM ignore_delete", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')", new Object[0]);
        this.sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'B')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "B"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "B"})});
        iterator.close();
    }

    @ParameterizedTest
    @ValueSource(strings={"none", "lookup", "input"})
    public void testDeletePartitionWithChangelog(String producer) throws Exception {
        this.sql("CREATE TABLE delete_table (pt INT, pk INT, v STRING, PRIMARY KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt)   WITH ('changelog-producer' = '" + producer + "', 'delete.force-produce-changelog'='true', 'bucket'='1')", new Object[0]);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM delete_table", new Object[0]);
        this.sql("INSERT INTO delete_table VALUES (1, 1, 'A'), (2, 2, 'B')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "A"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, 2, "B"})});
        this.sql("DELETE FROM delete_table WHERE pt = 1", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, 1, "A"})});
        this.sql("INSERT INTO delete_table VALUES (1, 1, 'B')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, 1, "B"})});
        iterator.close();
    }

    @Test
    public void testScanFromOldSchema() throws InterruptedException {
        this.sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)", new Object[0]);
        this.sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')", new Object[0]);
        Thread.sleep(1000L);
        long timestamp = System.currentTimeMillis();
        this.sql("ALTER TABLE select_old ADD f2 STRING", new Object[0]);
        this.sql("INSERT INTO select_old VALUES (3, 'c', 'C')", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */", timestamp)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a", null}), Row.of((Object[])new Object[]{2, "b", null})});
        Assertions.assertThat(this.sql("SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", DateTimeUtils.formatTimestamp((Timestamp)DateTimeUtils.toInternal((long)timestamp, (int)0), (int)0))).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, "a"}), Row.of((Object[])new Object[]{2, "b"})});
    }

    @Test
    public void testCountStarAppend() {
        this.sql("CREATE TABLE count_append (f0 INT, f1 STRING)", new Object[0]);
        this.sql("INSERT INTO count_append VALUES (1, 'a'), (2, 'b')", new Object[0]);
        String sql = "SELECT COUNT(*) FROM count_append";
        Assertions.assertThat(this.sql(sql, new Object[0])).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{2L})});
        this.validateCount1PushDown(sql);
    }

    @Test
    public void testCountStarPartAppend() {
        this.sql("CREATE TABLE count_part_append (f0 INT, f1 STRING, dt STRING) PARTITIONED BY (dt)", new Object[0]);
        this.sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (1, 'a', '1'), (2, 'b', '2')", new Object[0]);
        String sql = "SELECT COUNT(*) FROM count_part_append WHERE dt = '1'";
        Assertions.assertThat(this.sql(sql, new Object[0])).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{2L})});
        this.validateCount1PushDown(sql);
    }

    @Test
    public void testCountStarAppendWithDv() {
        this.sql("CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH ('deletion-vectors.enabled' = 'true')", new Object[0]);
        this.sql("INSERT INTO count_append_dv VALUES (1, 'a'), (2, 'b')", new Object[0]);
        String sql = "SELECT COUNT(*) FROM count_append_dv";
        Assertions.assertThat(this.sql(sql, new Object[0])).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{2L})});
        this.validateCount1PushDown(sql);
    }

    @Test
    public void testCountStarPK() {
        this.sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH ('file.format' = 'avro')", new Object[0]);
        this.sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", new Object[0]);
        this.sql("INSERT INTO count_pk VALUES (1, 'e')", new Object[0]);
        String sql = "SELECT COUNT(*) FROM count_pk";
        Assertions.assertThat(this.sql(sql, new Object[0])).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{4L})});
        this.validateCount1NotPushDown(sql);
    }

    @Test
    public void testCountStarPKDv() {
        this.sql("CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH ('file.format' = 'avro', 'deletion-vectors.enabled' = 'true')", new Object[0]);
        this.sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", new Object[0]);
        this.sql("INSERT INTO count_pk_dv VALUES (1, 'e')", new Object[0]);
        String sql = "SELECT COUNT(*) FROM count_pk_dv";
        Assertions.assertThat(this.sql(sql, new Object[0])).containsOnly((Object[])new Row[]{Row.of((Object[])new Object[]{4L})});
        this.validateCount1PushDown(sql);
    }

    @Test
    public void testParquetRowDecimalAndTimestamp() {
        this.sql("CREATE TABLE parquet_row_decimal(`row` ROW<f0 DECIMAL(2,1)>) WITH ('file.format' = 'parquet')", new Object[0]);
        this.sql("INSERT INTO parquet_row_decimal VALUES ( (ROW(1.2)) )", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM parquet_row_decimal", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{Row.of((Object[])new Object[]{new BigDecimal("1.2")})})});
        this.sql("CREATE TABLE parquet_row_timestamp(`row` ROW<f0 TIMESTAMP(0)>) WITH ('file.format' = 'parquet')", new Object[0]);
        this.sql("INSERT INTO parquet_row_timestamp VALUES ( (ROW(TIMESTAMP'2024-11-13 18:00:00')) )", new Object[0]);
        Assertions.assertThat(this.sql("SELECT * FROM parquet_row_timestamp", new Object[0])).containsExactly((Object[])new Row[]{Row.of((Object[])new Object[]{Row.of((Object[])new Object[]{DateTimeUtils.toLocalDateTime((String)"2024-11-13 18:00:00", (int)0)})})});
    }

    private void validateCount1PushDown(String sql) {
        Transformation transformation = AbstractTestBase.translate(this.tEnv, sql);
        while (!transformation.getInputs().isEmpty()) {
            transformation = (Transformation)transformation.getInputs().get(0);
        }
        Assertions.assertThat((String)transformation.getDescription()).contains(new CharSequence[]{"Count1AggFunction"});
    }

    private void validateCount1NotPushDown(String sql) {
        Transformation transformation = AbstractTestBase.translate(this.tEnv, sql);
        while (!transformation.getInputs().isEmpty()) {
            transformation = (Transformation)transformation.getInputs().get(0);
        }
        Assertions.assertThat((String)transformation.getDescription()).doesNotContain(new CharSequence[]{"Count1AggFunction"});
    }
}

