/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.Snapshot;
import org.apache.paimon.SnapshotTest;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class ContinuousFileStoreITCase
extends CatalogITCaseBase {
    @Override
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')", "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH ('changelog-producer'='input', 'bucket' = '1')");
    }

    @Test
    public void testSourceReuseWithScanPushDown() {
        this.sEnv.executeSql("CREATE TEMPORARY TABLE print1 (a STRING) WITH ('connector'='print')");
        this.sEnv.executeSql("CREATE TEMPORARY TABLE print2 (b STRING) WITH ('connector'='print')");
        StatementSet statementSet = this.sEnv.createStatementSet();
        statementSet.addInsertSql("INSERT INTO print1 SELECT a FROM T1");
        statementSet.addInsertSql("INSERT INTO print2 SELECT b FROM T1");
        Assertions.assertThat((String)statementSet.compilePlan().explain(new ExplainDetail[0])).contains(new CharSequence[]{"Reused"});
        this.sEnv.executeSql("CREATE TEMPORARY TABLE new_print1 (a STRING, b STRING, c STRING) WITH ('connector'='print')");
        this.sEnv.executeSql("CREATE TEMPORARY TABLE new_print2 (a STRING, b STRING, c STRING) WITH ('connector'='print')");
        statementSet = this.sEnv.createStatementSet();
        statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 WHERE a = 'Apache'");
        statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
        Assertions.assertThat((String)statementSet.compilePlan().explain(new ExplainDetail[0])).doesNotContain(new CharSequence[]{"Reused"});
        statementSet = this.sEnv.createStatementSet();
        statementSet.addInsertSql("INSERT INTO new_print1 SELECT * FROM T1 LIMIT 5");
        statementSet.addInsertSql("INSERT INTO new_print2 SELECT * FROM T1");
        Assertions.assertThat((String)statementSet.compilePlan().explain(new ExplainDetail[0])).doesNotContain(new CharSequence[]{"Reused"});
    }

    @Test
    public void testWithoutPrimaryKey() throws Exception {
        this.testSimple("T1");
    }

    @Test
    public void testWithPrimaryKey() throws Exception {
        this.testSimple("T2");
    }

    @Test
    public void testProjectionWithoutPrimaryKey() throws Exception {
        this.testProjection("T1");
    }

    @Test
    public void testProjectionWithPrimaryKey() throws Exception {
        this.testProjection("T2");
    }

    @Test
    public void testConsumerId() throws Exception {
        List<Row> result;
        String table = "T2";
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", table));
        this.batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        while ((result = this.sql("SELECT * FROM %s$consumers", table)).isEmpty()) {
            Thread.sleep(1000L);
        }
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", table));
        this.batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"})});
        iterator.close();
    }

    @Test
    public void testConsumerIdInBatch() throws Exception {
        String table = "T2";
        this.batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", table));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        Thread.sleep(1000L);
        iterator.close();
        this.batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
        Assertions.assertThat(this.sql("SELECT * FROM %s /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", table)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"})});
    }

    @Test
    @Timeout(value=120L)
    public void testSnapshotWatermark() throws Exception {
        this.streamSqlIter("CREATE TEMPORARY TABLE gen (a STRING, b STRING, c STRING, dt AS NOW(), WATERMARK FOR dt AS dt) WITH ('connector'='datagen')", new Object[0]);
        CloseableIterator<Row> insert1 = this.streamSqlIter("INSERT INTO T2 SELECT a, b, c FROM gen", new Object[0]);
        this.sql("CREATE TABLE WT (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)", new Object[0]);
        CloseableIterator<Row> insert2 = this.streamSqlIter("INSERT INTO WT SELECT * FROM T2 /*+ OPTIONS('consumer-id'='me','consumer.expiration-time'='3h') */", new Object[0]);
        while (true) {
            Set watermarks;
            if ((watermarks = this.sql("SELECT `watermark` FROM WT$snapshots", new Object[0]).stream().map(r -> (Long)r.getField(0)).collect(Collectors.toSet())).size() > 1) {
                insert1.close();
                insert2.close();
                return;
            }
            Thread.sleep(1000L);
        }
    }

    private void testSimple(String table) throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM %s", table));
        this.batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
        this.batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"})});
        iterator.close();
    }

    private void testProjection(String table) throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT b, c FROM %s", table));
        this.batchSql("INSERT INTO %s VALUES ('1', '2', '3'), ('4', '5', '6')", table);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"2", "3"}), Row.of((Object[])new Object[]{"5", "6"})});
        this.batchSql("INSERT INTO %s VALUES ('7', '8', '9')", table);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"8", "9"})});
        iterator.close();
    }

    @Test
    public void testContinuousLatest() throws Exception {
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */", new Object[0]));
        this.batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"})});
        iterator.close();
    }

    @Test
    public void testContinuousLatestStartingFromEmpty() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest') */", new Object[0]));
        this.sql("INSERT INTO T1 VALUES ('1', 'Hello', 'World')", new Object[0]);
        this.sql("INSERT INTO T1 VALUES ('2', 'Apache', 'Paimon')", new Object[0]);
        this.sql("INSERT INTO T1 VALUES ('3', 'C', 'c')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "Hello", "World"}), Row.of((Object[])new Object[]{"2", "Apache", "Paimon"}), Row.of((Object[])new Object[]{"3", "C", "c"})});
    }

    @Test
    public void testContinuousFromTimestamp() throws Exception {
        String sql = "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */";
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter(sql, 0));
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        this.batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        SnapshotManager snapshotManager = SnapshotTest.newSnapshotManager((FileIO)LocalFileIO.create(), (Path)this.getTableDirectory("T1"));
        ArrayList<Snapshot> snapshots = new ArrayList<Snapshot>((Collection<Snapshot>)ImmutableList.copyOf((Iterator)snapshotManager.snapshots()));
        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
        Snapshot first = (Snapshot)snapshots.get(0);
        Snapshot second = (Snapshot)snapshots.get(1);
        iterator = BlockingIterator.of(this.streamSqlIter(sql, second.timeMillis() - 1L));
        this.batchSql("INSERT INTO T1 VALUES ('13', '14', '15')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"}), Row.of((Object[])new Object[]{"13", "14", "15"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter(sql, second.timeMillis()));
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"}), Row.of((Object[])new Object[]{"13", "14", "15"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter(sql, second.timeMillis() + 1L));
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"13", "14", "15"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter(sql, first.timeMillis() - 1L));
        Assertions.assertThat((List)iterator.collect(5)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"}), Row.of((Object[])new Object[]{"13", "14", "15"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter(sql, snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue()).timeMillis() + 1L));
        this.batchSql("INSERT INTO T1 VALUES ('16', '17', '18')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"16", "17", "18"})});
        iterator.close();
    }

    @Test
    public void testLackStartupTimestamp() {
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */", new Object[0])).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("must set only one key in [scan.timestamp-millis,scan.timestamp] when you use from-timestamp for scan.mode");
    }

    @Test
    public void testConfigureStartupTimestamp() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan.timestamp-millis'='%s') */", 0));
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        this.batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */", 0)).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("scan.timestamp-millis must be null when you use latest for scan.mode");
    }

    @Test
    public void testConfigureStartupSnapshot() throws Exception {
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        this.batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"})});
        iterator.close();
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */", 0)).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("scan.snapshot-id must be null when you use latest for scan.mode");
    }

    @Test
    public void testConfigureStartupSnapshotFull() throws Exception {
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        this.batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 1));
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        iterator.close();
        this.batchSql("INSERT INTO T1 VALUES ('13', '14', '15')", new Object[0]);
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat((List)iterator.collect(4)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'='%s') */", 2));
        Assertions.assertThat((List)iterator.collect(5)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"}), Row.of((Object[])new Object[]{"13", "14", "15"})});
        iterator.close();
    }

    @Test
    public void testIgnoreOverwrite() throws Exception {
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T1", new Object[0]));
        this.batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        this.batchSql("INSERT OVERWRITE T1 VALUES ('7', '8', '9')", new Object[0]);
        this.batchSql("INSERT INTO T1 VALUES ('9', '10', '11')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"9", "10", "11"})});
        iterator.close();
    }

    @Test
    public void testUnsupportedUpsert() {
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */", new Object[0])).hasCauseInstanceOf(ValidationException.class).hasRootCauseMessage("File store continuous reading does not support upsert changelog mode.");
    }

    @Test
    public void testUnsupportedEventual() {
        Assertions.assertThatThrownBy(() -> this.streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */", new Object[0])).hasCauseInstanceOf(ValidationException.class).hasRootCauseMessage("File store continuous reading does not support eventual consistency mode.");
    }

    @Test
    public void testFlinkMemoryPool() {
        Assertions.assertThatThrownBy(() -> this.batchSql("INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ VALUES ('1', '2', '3'), ('4', '5', '6')", "T1")).hasCauseInstanceOf(IllegalArgumentException.class).hasRootCauseMessage("Weights for operator scope use cases must be greater than 0.");
        this.batchSql("INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ VALUES ('1', '2', '3'), ('4', '5', '6')", "T1");
        Assertions.assertThat((int)this.batchSql("SELECT * FROM T1", new Object[0]).size()).isEqualTo(2);
    }

    @Test
    public void testDynamicPartitionPruningNotWork() throws Exception {
        this.sql("CREATE TABLE dim (x INT PRIMARY KEY NOT ENFORCED, y STRING, z INT)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2)", new Object[0]);
        this.sql("CREATE TABLE fact (a INT, b BIGINT, c STRING, p INT, `proctime` AS PROCTIME(), PRIMARY KEY (a, p) NOT ENFORCED) PARTITIONED BY (p)\n", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 1) VALUES (10, 100, 'aaa'), (11, 101, 'bbb'), (12, 102, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 2) VALUES (20, 200, 'aaa'), (21, 201, 'bbb'), (22, 202, 'ccc')", new Object[0]);
        this.sql("INSERT INTO fact PARTITION (p = 3) VALUES (30, 300, 'aaa'), (31, 301, 'bbb'), (32, 302, 'ccc')", new Object[0]);
        String joinSql = "SELECT a, b, c, p, x, y FROM fact INNER JOIN dim ON x = p and z = 1";
        Assertions.assertThat((String)this.sEnv.explainSql(joinSql, new ExplainDetail[0])).doesNotContain(new CharSequence[]{"DynamicFilteringDataCollector"});
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter(joinSql, new Object[0]));
        Assertions.assertThat((List)iterator.collect(6)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{10, 100L, "aaa", 1, 1, "a"}), Row.of((Object[])new Object[]{11, 101L, "bbb", 1, 1, "a"}), Row.of((Object[])new Object[]{12, 102L, "ccc", 1, 1, "a"}), Row.of((Object[])new Object[]{20, 200L, "aaa", 2, 2, "b"}), Row.of((Object[])new Object[]{21, 201L, "bbb", 2, 2, "b"}), Row.of((Object[])new Object[]{22, 202L, "ccc", 2, 2, "b"})});
        iterator.close();
    }

    @Test
    public void testIgnoreDelete() throws Exception {
        this.sql("CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true')", new Object[0]);
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT * FROM ignore_delete /*+ OPTIONS('continuous.discovery-interval' = '1s', 'scan.snapshot-id' = '1') */", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')", new Object[0]);
        this.sql("DELETE FROM ignore_delete WHERE pk = 1", new Object[0]);
        this.sql("INSERT INTO ignore_delete VALUES (1, 'B')", new Object[0]);
        Assertions.assertThat((List)iterator.collect(4)).containsExactly((Object[])new Row[]{Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "A"}), Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2, "B"}), Row.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{1, "A"}), Row.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{1, "B"})});
        iterator.close();
    }

    @Test
    public void testScanFromOldSchema() throws Exception {
        this.sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)", new Object[0]);
        this.sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')", new Object[0]);
        Thread.sleep(1000L);
        long timestamp = System.currentTimeMillis();
        this.sql("ALTER TABLE select_old ADD f2 STRING", new Object[0]);
        this.sql("INSERT INTO select_old VALUES (3, 'c', 'C')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM select_old /*+ OPTIONS('scan.timestamp-millis'='%s') */", timestamp));
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, "c", "C"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM select_old FOR SYSTEM_TIME AS OF TIMESTAMP '%s'", DateTimeUtils.formatTimestamp((Timestamp)DateTimeUtils.toInternal((long)timestamp, (int)0), (int)0)));
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, "c"})});
    }

    @ParameterizedTest(name="changelog-producer = {0}")
    @ValueSource(strings={"none", "input"})
    public void testScanFromChangelog(String changelogProducer) throws Exception {
        this.batchSql("CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)\n WITH ('changelog-producer'='%s', 'bucket' = '1', \n 'snapshot.num-retained.max' = '2',\n 'snapshot.num-retained.min' = '1',\n 'changelog.num-retained.max' = '3',\n 'changelog.num-retained.min' = '1'\n)", changelogProducer);
        this.batchSql("INSERT INTO T3 VALUES ('1', '2', '3')", new Object[0]);
        this.batchSql("INSERT INTO T3 VALUES ('4', '5', '6')", new Object[0]);
        this.batchSql("INSERT INTO T3 VALUES ('7', '8', '9')", new Object[0]);
        BlockingIterator iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"})});
        iterator.close();
        this.batchSql("INSERT INTO T3 VALUES ('10', '11', '12')", new Object[0]);
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 0));
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"4", "5", "6"}), Row.of((Object[])new Object[]{"7", "8", "9"}), Row.of((Object[])new Object[]{"10", "11", "12"})});
        iterator.close();
        iterator = BlockingIterator.of(this.streamSqlIter("SELECT * FROM T3 /*+ OPTIONS('scan.snapshot-id'='%s') */", 4));
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"10", "11", "12"})});
        iterator.close();
    }

    @Test
    public void testAvroRetractNotNullField() {
        List<Row> input = Arrays.asList(Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1, "A"}), Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, "A"}));
        String id = TestValuesTableFactory.registerData(input);
        this.sEnv.executeSql(String.format("CREATE TEMPORARY TABLE source (pk INT PRIMARY KEY NOT ENFORCED, a STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', 'changelog-mode' = 'I,D,UA,UB')", id));
        this.sql("CREATE TABLE avro_sink (pk INT PRIMARY KEY NOT ENFORCED, a STRING NOT NULL)  WITH ('file.format' = 'avro', 'merge-engine' = 'aggregation')", new Object[0]);
        Assertions.assertThatThrownBy(() -> this.sEnv.executeSql("INSERT INTO avro_sink select * from source").await()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, (String)"Caught NullPointerException, the possible reason is you have set following options together")});
    }
}

