/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class LookupJoinITCase
extends CatalogITCaseBase {
    @Override
    public List<String> ddl() {
        return Arrays.asList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())", "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms')", "CREATE TABLE PARTITIONED_DIM (i INT, j INT, k1 INT, k2 INT, PRIMARY KEY (i, j) NOT ENFORCED) PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms')");
    }

    @Override
    protected int defaultParallelism() {
        return 1;
    }

    @Test
    public void testLookupEmptyTable() throws Exception {
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookup() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 44, 444, 4444}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupWithLatest() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('scan.mode'='latest') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 44, 444, 4444}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupProjection() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupFilterPk() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupFilterSelect() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupFilterUnSelect() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupFilterUnSelectAndUpdate() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookup() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 22, 333, 3333}), Row.of((Object[])new Object[]{null, 33, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{null, 22, null, null}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{2, 44, 444, 4444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupProjection() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterPk() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterSelect() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelect() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @Test
    public void testRepeatRefresh() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupPartialUpdateIllegal() {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        Assertions.assertThatThrownBy(() -> this.sEnv.executeSql(query)).hasRootCauseMessage("Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)");
    }

    @Test
    public void testLookupPartialUpdate() throws Exception {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update', 'changelog-producer'='full-compaction', 'changelog-producer.compaction-interval'='1 s', 'continuous.discovery-interval'='10 ms')", new Object[0]);
        this.sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, 111, null})});
        this.sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111})});
        iterator.close();
    }

    @Test
    public void testRetryLookup() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testAsyncRetryLookup() throws Exception {
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='1s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Thread.sleep(2000L);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222})});
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testLookupPartitionedTable() throws Exception {
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO PARTITIONED_DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupNonPkAppendTable() throws Exception {
        this.sql("CREATE TABLE DIM_NO_PK (i INT, j INT, k1 INT, k2 INT) PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_NO_PK for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM_NO_PK VALUES (1, 11, 111, 1111), (1, 12, 112, 1112), (1, 11, 111, 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(5);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 12, 112, 1112}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }
}

