/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.sql;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TestStoragePartitionedJoins
extends SparkTestBaseWithCatalog {
    private static final String OTHER_TABLE_NAME = "other_table";
    private static final Map<String, String> TABLE_PROPERTIES = ImmutableMap.of((Object)"read.split.target-size", (Object)"16777216", (Object)"read.split.open-file-cost", (Object)"16777216");
    private static final Map<String, String> ENABLED_SPJ_SQL_CONF = ImmutableMap.of((Object)SQLConf.V2_BUCKETING_ENABLED().key(), (Object)"true", (Object)SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(), (Object)"true", (Object)SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), (Object)"false", (Object)SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), (Object)"false", (Object)SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), (Object)"-1", (Object)"spark.sql.iceberg.planning.preserve-data-grouping", (Object)"true");
    private static final Map<String, String> DISABLED_SPJ_SQL_CONF = ImmutableMap.of((Object)SQLConf.V2_BUCKETING_ENABLED().key(), (Object)"false", (Object)SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), (Object)"false", (Object)SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), (Object)"false", (Object)SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), (Object)"-1", (Object)"spark.sql.iceberg.planning.preserve-data-grouping", (Object)"true");
    private final PlanningMode planningMode;

    @Parameterized.Parameters(name="planningMode = {0}")
    public static Object[] parameters() {
        return new Object[]{PlanningMode.LOCAL, PlanningMode.DISTRIBUTED};
    }

    public TestStoragePartitionedJoins(PlanningMode planningMode) {
        this.planningMode = planningMode;
    }

    @BeforeClass
    public static void setupSparkConf() {
        spark.conf().set("spark.sql.shuffle.partitions", "4");
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", this.tableName);
        this.sql("DROP TABLE IF EXISTS %s", this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithBucketingOnByteColumn() throws NoSuchTableException {
        this.checkJoin("byte_col", "TINYINT", "bucket(4, byte_col)");
    }

    @Test
    public void testJoinsWithBucketingOnShortColumn() throws NoSuchTableException {
        this.checkJoin("short_col", "SMALLINT", "bucket(4, short_col)");
    }

    @Test
    public void testJoinsWithBucketingOnIntColumn() throws NoSuchTableException {
        this.checkJoin("int_col", "INT", "bucket(16, int_col)");
    }

    @Test
    public void testJoinsWithBucketingOnLongColumn() throws NoSuchTableException {
        this.checkJoin("long_col", "BIGINT", "bucket(16, long_col)");
    }

    @Test
    public void testJoinsWithBucketingOnTimestampColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP", "bucket(16, timestamp_col)");
    }

    @Test
    public void testJoinsWithBucketingOnTimestampNtzColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP_NTZ", "bucket(16, timestamp_col)");
    }

    @Test
    public void testJoinsWithBucketingOnDateColumn() throws NoSuchTableException {
        this.checkJoin("date_col", "DATE", "bucket(8, date_col)");
    }

    @Test
    public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException {
        this.checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
    }

    @Test
    public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
        this.checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
    }

    @Test
    public void testJoinsWithYearsOnTimestampColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP", "years(timestamp_col)");
    }

    @Test
    public void testJoinsWithYearsOnTimestampNtzColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP_NTZ", "years(timestamp_col)");
    }

    @Test
    public void testJoinsWithYearsOnDateColumn() throws NoSuchTableException {
        this.checkJoin("date_col", "DATE", "years(date_col)");
    }

    @Test
    public void testJoinsWithMonthsOnTimestampColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP", "months(timestamp_col)");
    }

    @Test
    public void testJoinsWithMonthsOnTimestampNtzColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP_NTZ", "months(timestamp_col)");
    }

    @Test
    public void testJoinsWithMonthsOnDateColumn() throws NoSuchTableException {
        this.checkJoin("date_col", "DATE", "months(date_col)");
    }

    @Test
    public void testJoinsWithDaysOnTimestampColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP", "days(timestamp_col)");
    }

    @Test
    public void testJoinsWithDaysOnTimestampNtzColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP_NTZ", "days(timestamp_col)");
    }

    @Test
    public void testJoinsWithDaysOnDateColumn() throws NoSuchTableException {
        this.checkJoin("date_col", "DATE", "days(date_col)");
    }

    @Test
    public void testJoinsWithHoursOnTimestampColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP", "hours(timestamp_col)");
    }

    @Test
    public void testJoinsWithHoursOnTimestampNtzColumn() throws NoSuchTableException {
        this.checkJoin("timestamp_col", "TIMESTAMP_NTZ", "hours(timestamp_col)");
    }

    @Test
    public void testJoinsWithMultipleTransformTypes() throws NoSuchTableException {
        String createTableStmt = "CREATE TABLE %s (  id BIGINT, int_col INT, date_col1 DATE, date_col2 DATE, date_col3 DATE,  timestamp_col TIMESTAMP, string_col STRING, dep STRING)USING iceberg PARTITIONED BY (  years(date_col1), months(date_col2), days(date_col3), hours(timestamp_col),   bucket(8, int_col), dep)TBLPROPERTIES (%s)";
        this.sql(createTableStmt, this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql(createTableStmt, this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Dataset<Row> dataDF = this.randomDataDF(table.schema(), 16);
        this.append(this.tableName, dataDF);
        this.append(this.tableName(OTHER_TABLE_NAME), dataDF);
        this.append(this.tableName(OTHER_TABLE_NAME), dataDF);
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.dep = t2.dep ORDER BY t1.id", this.tableName, this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.int_col, t1.date_col1 FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.date_col1 = t2.date_col1 ORDER BY t1.id, t1.int_col, t1.date_col1", this.tableName, this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.timestamp_col, t1.string_col FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.timestamp_col = t2.timestamp_col AND t1.string_col = t2.string_col ORDER BY t1.id, t1.timestamp_col, t1.string_col", this.tableName, this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.date_col1, t1.date_col2, t1.date_col3 FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.date_col1 = t2.date_col1 AND t1.date_col2 = t2.date_col2 AND t1.date_col3 = t2.date_col3 ORDER BY t1.id, t1.date_col1, t1.date_col2, t1.date_col3", this.tableName, this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.int_col, t1.timestamp_col, t1.dep FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.timestamp_col = t2.timestamp_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.timestamp_col, t1.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithCompatibleSpecEvolution() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        table.updateSpec().addField("dep").commit();
        this.sql("REFRESH TABLE %s", this.tableName);
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        table.updateSpec().addField((Term)Expressions.bucket((String)"int_col", (int)8)).commit();
        this.sql("REFRESH TABLE %s", this.tableName);
        this.sql("INSERT INTO %s VALUES (2L, 200, 'hr')", this.tableName);
        this.sql("CREATE TABLE %s (other_id BIGINT, other_int_col INT, other_dep STRING)USING iceberg PARTITIONED BY (other_dep)TBLPROPERTIES (%s)", this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (2L, 200, 'hr')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT * FROM %s INNER JOIN %s ON id = other_id AND int_col = other_int_col AND dep = other_dep ORDER BY id, int_col, dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithIncompatibleSpecs() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        this.sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName);
        this.sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName);
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (bucket(8, int_col))TBLPROPERTIES (%s)", this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithUnpartitionedTables() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (  'read.split.target-size' = 16777216,  'read.split.open-file-cost' = 16777216)", this.tableName);
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        this.sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName);
        this.sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName);
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg TBLPROPERTIES (  'read.split.target-size' = 16777216,  'read.split.open-file-cost' = 16777216)", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithEmptyTable() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (2L, 200, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (3L, 300, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(3, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithOneSplitTables() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(0, 0, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testJoinsWithMismatchingPartitionKeys() {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName);
        this.sql("INSERT INTO %s VALUES (2L, 100, 'hr')", this.tableName);
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep)TBLPROPERTIES (%s)", this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(TABLE_PROPERTIES));
        this.sql("INSERT INTO %s VALUES (1L, 100, 'software')", this.tableName(OTHER_TABLE_NAME));
        this.sql("INSERT INTO %s VALUES (3L, 300, 'hardware')", this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT * FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.dep = t2.dep ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, t2.dep", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    @Test
    public void testAggregates() throws NoSuchTableException {
        this.sql("CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)USING iceberg PARTITIONED BY (dep, bucket(8, int_col))TBLPROPERTIES (%s)", this.tableName, this.tablePropsAsString(TABLE_PROPERTIES));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Dataset<Row> dataDF = this.randomDataDF(table.schema(), 100);
        this.append(this.tableName, dataDF);
        this.assertPartitioningAwarePlan(1, 3, "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep, int_col ORDER BY count", this.tableName, this.tableName(OTHER_TABLE_NAME));
        this.assertPartitioningAwarePlan(1, 3, "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep ORDER BY count", this.tableName, this.tableName(OTHER_TABLE_NAME));
    }

    private void checkJoin(String sourceColumnName, String sourceColumnType, String transform) throws NoSuchTableException {
        String createTableStmt = "CREATE TABLE %s (id BIGINT, salary INT, %s %s)USING iceberg PARTITIONED BY (%s)TBLPROPERTIES (%s)";
        this.sql(createTableStmt, this.tableName, sourceColumnName, sourceColumnType, transform, this.tablePropsAsString(TABLE_PROPERTIES));
        this.configurePlanningMode(this.tableName, this.planningMode);
        this.sql(createTableStmt, this.tableName(OTHER_TABLE_NAME), sourceColumnName, sourceColumnType, transform, this.tablePropsAsString(TABLE_PROPERTIES));
        this.configurePlanningMode(this.tableName(OTHER_TABLE_NAME), this.planningMode);
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Dataset<Row> dataDF = this.randomDataDF(table.schema(), 200);
        this.append(this.tableName, dataDF);
        this.append(this.tableName(OTHER_TABLE_NAME), dataDF);
        this.assertPartitioningAwarePlan(1, 3, "SELECT t1.id, t1.salary, t1.%s FROM %s t1 INNER JOIN %s t2 ON t1.id = t2.id AND t1.%s = t2.%s ORDER BY t1.id, t1.%s", sourceColumnName, this.tableName, this.tableName(OTHER_TABLE_NAME), sourceColumnName, sourceColumnName, sourceColumnName);
    }

    private void assertPartitioningAwarePlan(int expectedNumShufflesWithSPJ, int expectedNumShufflesWithoutSPJ, String query, Object ... args) {
        AtomicReference rowsWithSPJ = new AtomicReference();
        AtomicReference rowsWithoutSPJ = new AtomicReference();
        this.withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            String plan = this.executeAndKeepPlan(query, args).toString();
            int actualNumShuffles = StringUtils.countMatches((CharSequence)plan, (CharSequence)"Exchange");
            Assert.assertEquals((String)"Number of shuffles with enabled SPJ must match", (long)expectedNumShufflesWithSPJ, (long)actualNumShuffles);
            rowsWithSPJ.set(this.sql(query, args));
        });
        this.withSQLConf(DISABLED_SPJ_SQL_CONF, () -> {
            String plan = this.executeAndKeepPlan(query, args).toString();
            int actualNumShuffles = StringUtils.countMatches((CharSequence)plan, (CharSequence)"Exchange");
            Assert.assertEquals((String)"Number of shuffles with disabled SPJ must match", (long)expectedNumShufflesWithoutSPJ, (long)actualNumShuffles);
            rowsWithoutSPJ.set(this.sql(query, args));
        });
        this.assertEquals("SPJ should not change query output", (List)rowsWithoutSPJ.get(), (List)rowsWithSPJ.get());
    }

    private Dataset<Row> randomDataDF(Schema schema, int numRows) {
        Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0L);
        JavaRDD rowRDD = sparkContext.parallelize((List)Lists.newArrayList(rows));
        StructType rowSparkType = SparkSchemaUtil.convert((Schema)schema);
        return spark.internalCreateDataFrame(JavaRDD.toRDD((JavaRDD)rowRDD), rowSparkType, false);
    }

    private void append(String table, Dataset<Row> df) throws NoSuchTableException {
        df.coalesce(1).writeTo(table).option("fanout-enabled", "true").append();
    }
}

