/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

public class LookupJoinITCase
extends CatalogITCaseBase {
    @Override
    public List<String> ddl() {
        return Collections.singletonList("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
    }

    @Override
    protected int defaultParallelism() {
        return 1;
    }

    private void initTable(FlinkConnectorOptions.LookupCacheMode cacheMode) {
        String dim = "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms' %s)";
        String partitioned = "CREATE TABLE PARTITIONED_DIM (i INT, j INT, k1 INT, k2 INT, PRIMARY KEY (i, j) NOT ENFORCED)PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms' %s)";
        String fullOption = ", 'lookup.cache' = 'full'";
        String lruOption = ", 'changelog-producer'='lookup'";
        switch (cacheMode) {
            case FULL: {
                this.tEnv.executeSql(String.format(dim, fullOption));
                this.tEnv.executeSql(String.format(partitioned, fullOption));
                break;
            }
            case AUTO: {
                this.tEnv.executeSql(String.format(dim, lruOption));
                this.tEnv.executeSql(String.format(partitioned, lruOption));
                break;
            }
            default: {
                throw new UnsupportedOperationException();
            }
        }
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupEmptyTable(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 44, 444, 4444}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupIgnoreScanOptions(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String scanOption = ThreadLocalRandom.current().nextBoolean() ? "'scan.mode'='latest'" : "'scan.snapshot-id'='2'";
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS(" + scanOption + ") */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 44, 444, 4444}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupProjection(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterPk(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterSelect(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterUnSelect(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testLookupFilterUnSelectAndUpdate(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, null, null}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookup() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.AUTO);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 22, 333, 3333}), Row.of((Object[])new Object[]{null, 33, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{null, 22, null, null}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{2, 44, 444, 4444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupProjection() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterPk() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.i > 2";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterSelect() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k1 > 111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelect() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, null}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, 444})});
        iterator.close();
    }

    @Test
    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.FULL);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)", new Object[0]);
        String query = "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (11), (22), (33)", new Object[0]);
        List result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, 222}), Row.of((Object[])new Object[]{22, 333}), Row.of((Object[])new Object[]{33, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (11), (22), (33), (44)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{11, 111}), Row.of((Object[])new Object[]{22, null}), Row.of((Object[])new Object[]{33, 333}), Row.of((Object[])new Object[]{44, null})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testRepeatRefresh(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 22, 222}), Row.of((Object[])new Object[]{3, null, null})});
        this.sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111}), Row.of((Object[])new Object[]{2, 44, 444}), Row.of((Object[])new Object[]{3, 33, 333}), Row.of((Object[])new Object[]{4, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupPartialUpdateIllegal() {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        Assertions.assertThatThrownBy(() -> this.sEnv.executeSql(query)).hasRootCauseMessage("Partial update streaming reading is not supported. You can use 'lookup' or 'full-compaction' changelog producer to support streaming reading. ('input' changelog producer is also supported, but only returns input records.)");
    }

    @Test
    public void testLookupPartialUpdate() throws Exception {
        this.testLookupPartialUpdate("none");
        this.testLookupPartialUpdate("zstd");
    }

    private void testLookupPartialUpdate(String compression) throws Exception {
        this.sql("CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('merge-engine'='partial-update', 'changelog-producer'='full-compaction', 'changelog-producer.compaction-interval'='1 s'," + String.format(" 'lookup.cache-spill-compression'='%s',", compression) + " 'continuous.discovery-interval'='10 ms')", new Object[0]);
        this.sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, 111, null})});
        this.sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111})});
        iterator.close();
        this.sql("DROP TABLE DIM2", new Object[0]);
        this.sql("TRUNCATE TABLE T", new Object[0]);
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testRetryLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testAsyncRetryLookup(FlinkConnectorOptions.LookupCacheMode cacheMode) throws Exception {
        this.initTable(cacheMode);
        this.sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='30') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222})});
        this.sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1, 10L, TimeUnit.MINUTES)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testLookupPartitionedTable() throws Exception {
        this.initTable(FlinkConnectorOptions.LookupCacheMode.AUTO);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO PARTITIONED_DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testLookupMaxPtPartitionedTablePartialCache() throws Exception {
        this.innerTestLookupMaxPtPartitionedTable(FlinkConnectorOptions.LookupCacheMode.AUTO);
    }

    @Test
    public void testLookupMaxPtPartitionedTableFullCache() throws Exception {
        this.innerTestLookupMaxPtPartitionedTable(FlinkConnectorOptions.LookupCacheMode.FULL);
    }

    private void innerTestLookupMaxPtPartitionedTable(FlinkConnectorOptions.LookupCacheMode mode) throws Exception {
        this.tEnv.executeSql("CREATE TABLE PARTITIONED_DIM (pt STRING, k INT, v INT, PRIMARY KEY (pt, k) NOT ENFORCED)PARTITIONED BY (`pt`) WITH ('bucket' = '1', 'lookup.dynamic-partition' = 'max_pt()', 'lookup.dynamic-partition.refresh-interval' = '1 ms', " + String.format("'lookup.cache' = '%s', ", mode) + "'continuous.discovery-interval'='1 ms')");
        String query = "SELECT T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for system_time as of T.proctime AS D ON T.i = D.k";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO PARTITIONED_DIM VALUES ('1', 1, 2)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        List result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 2})});
        this.sql("INSERT INTO PARTITIONED_DIM VALUES ('2', 1, 3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        result = iterator.collect(1);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 3})});
        iterator.close();
    }

    @Test
    public void testLookupNonPkAppendTable() throws Exception {
        this.sql("CREATE TABLE DIM_NO_PK (i INT, j INT, k1 INT, k2 INT) PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms')", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_NO_PK for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, null, null, null}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM_NO_PK VALUES (1, 11, 111, 1111), (1, 12, 112, 1112), (1, 11, 111, 1111)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (4)", new Object[0]);
        result = iterator.collect(5);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{1, 12, 112, 1112}), Row.of((Object[])new Object[]{2, null, null, null}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testWithSequenceFieldTable() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (1), (2), (3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 11, 444, 4444), (3, 33, 333, 3333)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T VALUES (1), (2), (3), (4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }

    @Test
    public void testAsyncRetryLookupWithSequenceField() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */ T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.i";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (3)", new Object[0]);
        this.sql("INSERT INTO T VALUES (2)", new Object[0]);
        this.sql("INSERT INTO T VALUES (1)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (3, 33, 333, 3333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(1)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{3, 33, 333, 3333})});
        iterator.close();
    }

    @Test
    public void testAsyncRetryLookupSecKeyWithSequenceField() throws Exception {
        this.sql("CREATE TABLE DIM_WITH_SEQUENCE (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH ('continuous.discovery-interval'='1 ms', 'sequence.field' = 'j')", new Object[0]);
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (1, 1, 111, 1111), (2, 2, 111, 2222)", new Object[0]);
        String query = "SELECT /*+ LOOKUP('table'='D', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'output-mode'='allow_unordered', 'fixed-delay'='3s','max-attempts'='60') */ T.i, D.i, D.j, D.k2 FROM T LEFT JOIN DIM_WITH_SEQUENCE /*+ OPTIONS('lookup.async'='true') */ for system_time as of T.proctime AS D ON T.i = D.k1";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T VALUES (111)", new Object[0]);
        this.sql("INSERT INTO T VALUES (333)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(2)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{111, 1, 1, 1111}), Row.of((Object[])new Object[]{111, 2, 2, 2222})});
        this.sql("INSERT INTO DIM_WITH_SEQUENCE VALUES (2, 1, 111, 3333), (3, 3, 333, 3333)", new Object[0]);
        this.sql("INSERT INTO T VALUES (111)", new Object[0]);
        Assertions.assertThat((List)iterator.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{111, 1, 1, 1111}), Row.of((Object[])new Object[]{111, 2, 2, 2222}), Row.of((Object[])new Object[]{333, 3, 3, 3333})});
        iterator.close();
    }

    @ParameterizedTest
    @EnumSource(value=FlinkConnectorOptions.LookupCacheMode.class)
    public void testPartialCacheBucketKeyOrder(FlinkConnectorOptions.LookupCacheMode mode) throws Exception {
        this.sql("CREATE TABLE DIM (k2 INT, k1 INT, j INT , i INT, PRIMARY KEY(i, j) NOT ENFORCED) WITH ('continuous.discovery-interval'='1 ms', 'lookup.cache'='%s', 'bucket' = '2', 'bucket-key' = 'j')", mode);
        this.sql("CREATE TABLE T2 (j INT, i INT, `proctime` AS PROCTIME())", new Object[0]);
        this.sql("INSERT INTO DIM VALUES (1111, 111, 11, 1), (2222, 222, 22, 2)", new Object[0]);
        String query = "SELECT T2.i, D.j, D.k1, D.k2 FROM T2 LEFT JOIN DIM for system_time as of T2.proctime AS D ON T2.i = D.i and T2.j = D.j";
        BlockingIterator iterator = BlockingIterator.of((Iterator)this.sEnv.executeSql(query).collect());
        this.sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3)", new Object[0]);
        List result = iterator.collect(3);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 111, 1111}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, null, null, null})});
        this.sql("INSERT INTO DIM VALUES (2222, 222, 11, 1), (3333, 333, 33, 3)", new Object[0]);
        Thread.sleep(2000L);
        this.sql("INSERT INTO T2 VALUES (11, 1), (22, 2), (33, 3), (44, 4)", new Object[0]);
        result = iterator.collect(4);
        Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{1, 11, 222, 2222}), Row.of((Object[])new Object[]{2, 22, 222, 2222}), Row.of((Object[])new Object[]{3, 33, 333, 3333}), Row.of((Object[])new Object[]{4, null, null, null})});
        iterator.close();
    }
}

