/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

public class LookupJoinITCase
extends CatalogITCaseBase {
    @Override
    public List<String> ddl() {
        return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
    }

    @Override
    protected int defaultParallelism() {
        return 1;
    }

    private void initTable(FlinkConnectorOptions.LookupCacheMode cacheMode) {
        String dim = "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms' %s)";
        String partitioned = "CREATE TABLE PARTITIONED_DIM (i INT, j INT, k1 INT, k2 INT, PRIMARY KEY (i, j) NOT ENFORCED)PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms' %s)";
        String fullOption = ", 'lookup.cache' = 'full'";
        String lruOption = ", 'changelog-producer'='lookup'";
        switch (cacheMode) {
            case FULL: {
                this.tEnv.executeSql(String.format(dim, fullOption));
                this.tEnv.executeSql(String.format(partitioned, fullOption));
                break;
            }
            case AUTO: {
                this.tEnv.executeSql(String.format(dim, lruOption));
                this.tEnv.executeSql(String.format(partitioned, lruOption));
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupEmptyTable(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 44, 444, 4444}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupIgnoreScanOptions() throws Exception {
        this.sql("CREATE TABLE d (\n  pt INT,\n  id INT,\n  data STRING,\n  PRIMARY KEY (pt, id) NOT ENFORCED\n) PARTITIONED BY (pt) WITH ( 'bucket' = '1', 'continuous.discovery-interval'='1 ms' )", new Object[0]);
        this.sql("CREATE TABLE t1 (\n  pt INT,\n  id INT,\n  data STRING,\n  `proctime` AS PROCTIME(),\n  PRIMARY KEY (pt, id) NOT ENFORCED\n) PARTITIONED BY (pt) with ( 'continuous.discovery-interval'='1 ms' )", new Object[0]);
        this.sql("INSERT INTO d VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')", new Object[0]);
        this.sql("INSERT INTO t1 VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')", new Object[0]);
        BlockingIterator<Row, Row> streamIter = this.streamSqlBlockIter("SELECT T.pt, T.id, T.data, D.pt, D.id, D.data FROM t1 AS T LEFT JOIN d /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'scan.snapshot-id'='2') */ FOR SYSTEM_TIME AS OF T.proctime AS D ON T.id = D.id", new Object[0]);
        Assertions.assertThat((List)streamIter.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 1, "one", null, null, null}), Row.of((Object[])new Object[]{2, 2, "two", null, null, null}), Row.of((Object[])new Object[]{3, 3, "three", 3, 3, "three"})});
        streamIter.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupProjection(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterPk(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterSelect(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterUnSelect(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterUnSelectAndUpdate(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupUpdateAfterLeafPredicate0() throws Exception {
        this.sql("CREATE TABLE fact (\n  name string,\n  k string,\n  proctime as PROCTIME()\n)\nWITH (\n    'bucket' = '1',\n    'bucket-key'='name'\n);", new Object[0]);
        this.sql("CREATE TABLE dim (\n  id bigint,\n  k string,\n  v string,\n  PRIMARY KEY (id) NOT ENFORCED \n)\nWITH (\n    'bucket' = '1'\n);", new Object[0]);
        String query = "select \na.name,\na.k as ak,\nb.k as bk,\nb.v\nfrom fact  /*+ OPTIONS('scan.mode'='latest','continuous.discovery-interval'='1s') */ a\nleft join dim /*+ OPTIONS('continuous.discovery-interval'='3s') */ FOR SYSTEM_TIME AS OF a.proctime AS b \non a.k = b.k and b.v<'y'";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO dim VALUES (1,'k','x')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r1','k')", new Object[0]);
        iterator.collect(1);
        this.sql("INSERT INTO dim VALUES (1,'k','y')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r2','k')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r3','k')", new Object[0]);
        List result = iterator.collect(2);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"r2", "k", "k", "x"}), Row.of((Object[])new Object[]{"r3", "k", "k", "x"})});
        iterator.close();
    }

    @Test
    public void testLookupUpdateAfterLeafPredicate1() throws Exception {
        this.sql("CREATE TABLE fact (\n  name string,\n  k string,\n  proctime as PROCTIME()\n)\nWITH (\n    'bucket' = '1',\n    'bucket-key'='name'\n);", new Object[0]);
        this.sql("CREATE TABLE dim (\n  id bigint,\n  k string,\n  v string,\n  PRIMARY KEY (id) NOT ENFORCED \n)\nWITH (\n    'bucket' = '1'\n);", new Object[0]);
        String query = "select \na.name,\na.k as ak,\nb.k as bk,\nb.v\nfrom fact  /*+ OPTIONS('scan.mode'='latest','continuous.discovery-interval'='1s') */ a\nleft join dim /*+ OPTIONS('continuous.discovery-interval'='3s') */ FOR SYSTEM_TIME AS OF a.proctime AS b \non a.k = b.k and b.v<'y'";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO dim VALUES (1,'k','x')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r1','k')", new Object[0]);
        Thread.sleep(5000L);
        this.sql("INSERT INTO dim VALUES (1,'k','y')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r2','k')", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('r3','k')", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"r1", "k", "k", "x"}), Row.of((Object[])new Object[]{"r2", "k", null, null}), Row.of((Object[])new Object[]{"r3", "k", null, null})});
        iterator.close();
    }

    @Test
    public void testLookupUpdateAfterLeafPredicate2() throws Exception {
        this.sql("CREATE TABLE fact (name STRING, i INT, `proctime` AS PROCTIME())", new Object[0]);
        this.sql("CREATE TABLE dim (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT fact.name, fact.i, D.k1 FROM fact LEFT JOIN dim for system_time as of fact.proctime AS D ON fact.i = D.j AND D.k1 > 100";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO dim VALUES (1, 11, 111, 1111)", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('a',11)", new Object[0]);
        List result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"a", 11, 111})});
        this.sql("INSERT INTO dim VALUES (1,11,100,1111)", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('b',11)", new Object[0]);
        result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"b", 11, null})});
        iterator.close();
    }

    @Test
    public void testLookupUpdateAfterCompoundPredicate() throws Exception {
        this.sql("CREATE TABLE fact (name STRING, i INT, `proctime` AS PROCTIME())", new Object[0]);
        this.sql("CREATE TABLE dim (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT fact.name, fact.i, D.k1, D.k2 FROM fact LEFT JOIN dim for system_time as of fact.proctime AS D ON fact.i = D.j AND D.k1 > 100 AND D.k2>1000";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO dim VALUES (1, 11, 111, 1111)", new Object[0]);
        this.sql("INSERT INTO dim VALUES (2, 11, 111, 1000)", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('a',11)", new Object[0]);
        List result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"a", 11, 111, 1111})});
        this.sql("INSERT INTO dim VALUES (1,11,100,1111)", new Object[0]);
        this.sql("INSERT INTO fact VALUES ('b',11)", new Object[0]);
        result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"b", 11, null, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookup() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.AUTO);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 22, 333, 3333}), Row.of((Object[])new Object[]{null, 33, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{null, 22, null, null}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{2, 44, 444, 4444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupProjection() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterPk() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterSelect() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelect() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testRepeatRefresh(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupPartialUpdateIllegal() {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        Assertions.assertThatThrownBy(() -> this.sEnv.executeSql(query)).hasRootCauseMessage("Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)");
    }

    @Test
    public void testLookupPartialUpdate() throws Exception {
        this.testLookupPartialUpdate("none");
        this.testLookupPartialUpdate("zstd");
    }

    private void testLookupPartialUpdate(String compression) throws Exception {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update', 'changelog-producer'='full-compaction', 'changelog-producer.compaction-interval'='1 s'," + String.format(" 'lookup.cache-spill-compression'='%s',", compression) + " 'continuous.discovery-interval'='10 ms')", new Object[0]);
        this.sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, 111, null})});
        this.sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111})});
        iterator.close();
        this.sql("DROP TABLE DIM2", new Object[0]);
        this.sql("TRUNCATE TABLE T", new Object[0]);
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testRetryLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testAsyncRetryLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222})});
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1, 10L, TimeUnit.MINUTES)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testLookupPartitionedTable() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.AUTO);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO PARTITIONED_DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupMaxPtPartitionedTable(FlinkConnectorOptions.LookupCacheMode mode) throws Exception {
        String bucket;
        String primaryKeys;
        boolean testDynamicBucket = ThreadLocalRandom.current().nextBoolean();
        if (testDynamicBucket) {
            primaryKeys = "k";
            bucket = "-1";
        } else {
            primaryKeys = "pt, k";
            bucket = "1";
        }
        this.sql("CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (%s) NOT ENFORCED)PARTITIONED BY (`pt`) WITH ('bucket' = '%s', 'lookup.dynamic-partition' = 'max_pt()', 'lookup.dynamic-partition.refresh-interval' = '1 ms', 'lookup.cache' = '%s', 'continuous.discovery-interval'='1 ms')", primaryKeys, bucket, mode);
        String query = "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.k";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        List result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2})});
        this.sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3})});
        iterator.close();
    }

    @Test
    public void testLookupNonPkAppendTable() throws Exception {
        this.sql("CREATE TABLE DIM_NO_PK (i INT, j INT, k1 INT, k2 INT) PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_NO_PK for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM_NO_PK VALUES (1, 11, 111, 1111), (1, 12, 112, 1112), (1, 11, 111, 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(5);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 12, 112, 1112}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testWithSequenceFieldTable() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 11, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testAsyncRetryLookupWithSequenceField() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 1, 111, 1111), (2, 2, 111, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */ T.i, D.i, D.j, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.k1";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (111)", new Object[0]);
        this.sql("INSERT INTO T VALUES (333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{111, 1, 1, 1111}), Row.of((Object[])new Object[]{111, 2, 2, 2222})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 1, 111, 3333), (3, 3, 333, 3333)", new Object[0]);
        this.sql("INSERT INTO T VALUES (111)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{111, 1, 1, 1111}), Row.of((Object[])new Object[]{111, 2, 2, 2222}), Row.of((Object[])new Object[]{333, 3, 3, 3333})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testPartialCacheBucketKeyOrder(FlinkConnectorOptions.LookupCacheMode mode) throws Exception {
        this.sql("CREATE TABLE DIM (k2 INT, k1 INT, j INT , i INT, PRIMARY KEY(i, j) NOT ENFORCED) WITH ('continuous.discovery-interval'='1 ms', 'lookup.cache'='%s', 'bucket' = '2', 'bucket-key' = 'j')", mode);
        this.sql("CREATE TABLE T2 (j INT, i INT, `proctime` AS PROCTIME())", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (1111, 111, 11, 1), (2222, 222, 22, 2)", new Object[0]);
        String query = "SELECT T2.i, D.j, D.k1, D.k2 FROM T2 LEFT JOIN DIM for system_time as of T2.proctime AS D ON T2.i = D.i and T2.j = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2222, 222, 11, 1), (3333, 333, 33, 3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3), (44, 4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 222, 2222}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testOverwriteDimTable(boolean isPkTable) throws Exception {
        this.sql("CREATE TABLE DIM (i INT %s, v int, pt STRING) PARTITIONED BY (pt) WITH ('continuous.discovery-interval'='1 ms')", isPkTable ? "PRIMARY KEY NOT ENFORCED" : "");
        BlockingIterator<Row, Row> iterator = this.streamSqlBlockIter("SELECT T.i, D.v, D.pt FROM T LEFT JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (1, 11, 'A'), (2, 22, 'B')", new Object[0]);
        this.sql("INSERT INTO T VALUES (1), (2)", new Object[0]);
        List result = iterator.collect(2);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, "A"}), Row.of((Object[])new Object[]{2, 22, "B"})});
        this.sql("INSERT OVERWRITE DIM PARTITION (pt='B') VALUES (3, 33)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, "B"})});
        iterator.close();
    }
}

