/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class LookupJoinBucketShuffleITCase
extends CatalogITCaseBase {
    private static final int BUCKET_NUMBER = 5;
    private static final int ROW_NUMBER = 100;

    @Test
    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase1() throws Exception {
        String nonPrimaryKeyDimTable = this.createNonPrimaryKeyDimTable("col1");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1".replace("DIM", nonPrimaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase2() throws Exception {
        String nonPrimaryKeyDimTable = this.createNonPrimaryKeyDimTable("col1,col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", nonPrimaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase3() throws Exception {
        String nonPrimaryKeyDimTable = this.createNonPrimaryKeyDimTable("col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", nonPrimaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase4() throws Exception {
        String nonPrimaryKeyDimTable = this.createNonPrimaryKeyDimTable("col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2 AND D.col4 = CAST('2024-06-09' AS DATE) AND D.col5 = 123.45 ".replace("DIM", nonPrimaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInFullCacheModeCase1() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(1, true, "col1");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInFullCacheModeCase2() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, true, "col1,col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInFullCacheModeCase3() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, true, "col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInFullCacheModeCase4() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, true, "col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2 AND D.col4 = CAST('2024-06-09' AS DATE) AND D.col5 = 123.45 ".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase1() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(1, false, "col1");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase2() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, false, "col1,col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase3() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, false, "col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    @Test
    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase4() throws Exception {
        String primaryKeyDimTable = this.createPrimaryKeyDimTable(2, false, "col2");
        String query = "SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, D.col2 FROM T JOIN DIM for system_time as of T.proc_time AS D ON T.col1 = D.col1 AND T.col2 = D.col2 AND D.col4 = CAST('2024-06-09' AS DATE) AND D.col5 = 123.45".replace("DIM", primaryKeyDimTable);
        this.testBucketNumberCases(query);
    }

    private void testBucketNumberCases(String query) throws Exception {
        List<Row> groundTruthRows = this.getGroundTruthRows();
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)5);
        List result1 = this.streamSqlBlockIter(query, new Object[0]).collect(100);
        Assertions.assertThat((List)result1).containsExactlyInAnyOrderElementsOf(groundTruthRows);
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)3);
        List result2 = this.streamSqlBlockIter(query, new Object[0]).collect(100);
        Assertions.assertThat((List)result2).containsExactlyInAnyOrderElementsOf(groundTruthRows);
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)8);
        List result3 = this.streamSqlBlockIter(query, new Object[0]).collect(100);
        Assertions.assertThat((List)result3).containsExactlyInAnyOrderElementsOf(groundTruthRows);
    }

    private String createPrimaryKeyDimTable(int primaryKeyNumber, boolean fullCache, String bucketKey) {
        String ddl;
        this.createSourceTable();
        String tableName = "DIM_PRIMARY";
        tableName = fullCache ? tableName + "_full_cache" : tableName + "_auto";
        if (bucketKey != null) {
            tableName = tableName + "_" + bucketKey.split(",").length;
        }
        if (primaryKeyNumber == 1) {
            tableName = tableName + "_1";
            ddl = String.format("CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, col4 DATE, col5 DECIMAL(10, 2), PRIMARY KEY (col1) NOT ENFORCED) WITH ('continuous.discovery-interval'='1 ms', 'bucket'='%s'", tableName, 5);
        } else {
            tableName = tableName + "_2";
            ddl = String.format("CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, col4 DATE, col5 DECIMAL(10, 2), PRIMARY KEY (col1, col2) NOT ENFORCED) WITH ('continuous.discovery-interval'='1 ms', 'bucket'='%s'", tableName, 5);
        }
        if (bucketKey != null) {
            ddl = ddl + ", 'bucket-key' = '" + bucketKey + "'";
        }
        ddl = fullCache ? ddl + " ,'lookup.cache' = 'full')" : ddl + " )";
        this.batchSql(ddl, new Object[0]);
        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s VALUES ", tableName));
        for (int index = 1; index < 100; ++index) {
            dml.append(String.format("(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45), ", index, index * 10, index * 10));
        }
        dml.append(String.format("(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)", 100, 1000, 1000));
        this.batchSql(dml.toString(), new Object[0]);
        return tableName;
    }

    private String createNonPrimaryKeyDimTable(String bucketKey) {
        this.createSourceTable();
        String tableName = "DIM";
        if (bucketKey != null) {
            tableName = tableName + "_" + bucketKey.split(",").length;
        }
        String ddl = String.format("CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, col4 DATE, col5 DECIMAL(10, 2)) WITH ('continuous.discovery-interval'='1 ms', 'bucket'='%s'", tableName, 5);
        if (bucketKey != null) {
            ddl = ddl + ", 'bucket-key' = '" + bucketKey + "')";
        }
        this.batchSql(ddl, new Object[0]);
        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s VALUES ", tableName));
        for (int index = 1; index < 100; ++index) {
            dml.append(String.format("(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45), ", index, index * 10, index * 10));
        }
        dml.append(String.format("(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)", 100, 1000, 1000));
        this.batchSql(dml.toString(), new Object[0]);
        return tableName;
    }

    private void createSourceTable() {
        String ddl = "CREATE TABLE T (col1 INT, col2 STRING, col3 INT, `proc_time` AS PROCTIME())";
        this.batchSql(ddl, new Object[0]);
        StringBuilder dml = new StringBuilder("INSERT INTO T VALUES ");
        for (int index = 1; index < 100; ++index) {
            dml.append(String.format("(%s, '%s', %s), ", index, index * 10, index * 10));
        }
        dml.append(String.format("(%s, '%s', %s)", 100, 1000, 1000));
        this.batchSql(dml.toString(), new Object[0]);
    }

    private List<Row> getGroundTruthRows() {
        ArrayList<Row> results = new ArrayList<Row>();
        for (int index = 1; index <= 100; ++index) {
            results.add(Row.of((Object[])new Object[]{index, String.valueOf(index * 10)}));
        }
        return results;
    }
}

