/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;

public class TestStoragePartitionedJoinsInRowLevelOperations
extends SparkExtensionsTestBase {
    private static final String OTHER_TABLE_NAME = "other_table";
    private static final Map<String, String> COMMON_TABLE_PROPERTIES = ImmutableMap.of((Object)"format-version", (Object)"2", (Object)"read.split.target-size", (Object)"16777216", (Object)"read.split.open-file-cost", (Object)"16777216");
    private static final Map<String, String> ENABLED_SPJ_SQL_CONF = ImmutableMap.of((Object)SQLConf.V2_BUCKETING_ENABLED().key(), (Object)"true", (Object)SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(), (Object)"true", (Object)SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(), (Object)"false", (Object)SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), (Object)"false", (Object)SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), (Object)"-1", (Object)"spark.sql.iceberg.planning.preserve-data-grouping", (Object)"true");

    @Parameterized.Parameters(name="catalogName = {0}, implementation = {1}, config = {2}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), SparkCatalogConfig.HIVE.properties()}};
    }

    public TestStoragePartitionedJoinsInRowLevelOperations(String catalogName, String implementation, Map<String, String> config) {
        super(catalogName, implementation, config);
    }

    @After
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName(OTHER_TABLE_NAME)});
    }

    @Test
    public void testCopyOnWriteDeleteWithoutShuffles() {
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadDeleteWithoutShuffles() {
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkDelete(RowLevelOperationMode mode) {
        String createTableStmt = "CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)";
        this.sql(createTableStmt, new Object[]{this.tableName, this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        this.sql(createTableStmt, new Object[]{this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        ImmutableMap deleteTableProps = ImmutableMap.of((Object)"write.delete.mode", (Object)mode.modeName(), (Object)"write.delete.distribution-mode", (Object)"none");
        this.sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, this.tablePropsAsString((Map)deleteTableProps)});
        this.withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            SparkPlan plan = this.executeAndKeepPlan("DELETE FROM %s t WHERE EXISTS (SELECT 1 FROM %s s WHERE t.id = s.id AND t.dep = s.dep) AND dep = 'hr'", new Object[]{this.tableName, this.tableName(OTHER_TABLE_NAME)});
            String planAsString = plan.toString();
            Assertions.assertThat((String)planAsString).doesNotContain(new CharSequence[]{"Exchange"});
        });
        ImmutableList expectedRows = ImmutableList.of((Object)this.row(new Object[]{2, 200, "hr"}), (Object)this.row(new Object[]{3, 300, "hr"}), (Object)this.row(new Object[]{4, 400, "hardware"}));
        this.assertEquals("Should have expected rows", (List)expectedRows, this.sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }

    @Test
    public void testCopyOnWriteUpdateWithoutShuffles() {
        this.checkUpdate(RowLevelOperationMode.COPY_ON_WRITE);
    }

    @Test
    public void testMergeOnReadUpdateWithoutShuffles() {
        this.checkUpdate(RowLevelOperationMode.MERGE_ON_READ);
    }

    private void checkUpdate(RowLevelOperationMode mode) {
        String createTableStmt = "CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)";
        this.sql(createTableStmt, new Object[]{this.tableName, this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        this.sql(createTableStmt, new Object[]{this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        ImmutableMap updateTableProps = ImmutableMap.of((Object)"write.update.mode", (Object)mode.modeName(), (Object)"write.update.distribution-mode", (Object)"none");
        this.sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, this.tablePropsAsString((Map)updateTableProps)});
        this.withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            SparkPlan plan = this.executeAndKeepPlan("UPDATE %s t SET salary = -1 WHERE EXISTS (SELECT 1 FROM %s s WHERE t.id = s.id AND t.dep = s.dep) AND dep = 'hr'", new Object[]{this.tableName, this.tableName(OTHER_TABLE_NAME)});
            String planAsString = plan.toString();
            Assertions.assertThat((String)planAsString).doesNotContain(new CharSequence[]{"Exchange"});
        });
        ImmutableList expectedRows = ImmutableList.of((Object)this.row(new Object[]{1, -1, "hr"}), (Object)this.row(new Object[]{2, 200, "hr"}), (Object)this.row(new Object[]{3, 300, "hr"}), (Object)this.row(new Object[]{4, 400, "hardware"}));
        this.assertEquals("Should have expected rows", (List)expectedRows, this.sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }

    @Test
    public void testCopyOnWriteMergeWithoutShuffles() {
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE, false);
    }

    @Test
    public void testCopyOnWriteMergeWithoutShufflesWithPredicate() {
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE, true);
    }

    @Test
    public void testMergeOnReadMergeWithoutShuffles() {
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ, false);
    }

    @Test
    public void testMergeOnReadMergeWithoutShufflesWithPredicate() {
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ, true);
    }

    private void checkMerge(RowLevelOperationMode mode, boolean withPredicate) {
        String createTableStmt = "CREATE TABLE %s (id INT, salary INT, dep STRING)USING iceberg PARTITIONED BY (dep) TBLPROPERTIES (%s)";
        this.sql(createTableStmt, new Object[]{this.tableName, this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 4, \"salary\": 400, \"dep\": \"hardware\" }"});
        this.append(this.tableName, new String[]{"{ \"id\": 6, \"salary\": 600, \"dep\": \"software\" }"});
        this.sql(createTableStmt, new Object[]{this.tableName(OTHER_TABLE_NAME), this.tablePropsAsString(COMMON_TABLE_PROPERTIES)});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 1, \"salary\": 110, \"dep\": \"hr\" }"});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 5, \"salary\": 500, \"dep\": \"hr\" }"});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 6, \"salary\": 300, \"dep\": \"software\" }"});
        this.append(this.tableName(OTHER_TABLE_NAME), new String[]{"{ \"id\": 10, \"salary\": 1000, \"dep\": \"ops\" }"});
        ImmutableMap mergeTableProps = ImmutableMap.of((Object)"write.merge.mode", (Object)mode.modeName(), (Object)"write.merge.distribution-mode", (Object)"none");
        this.sql("ALTER TABLE %s SET TBLPROPERTIES(%s)", new Object[]{this.tableName, this.tablePropsAsString((Map)mergeTableProps)});
        this.withSQLConf(ENABLED_SPJ_SQL_CONF, () -> {
            String predicate = withPredicate ? "AND t.dep IN ('hr', 'ops', 'software')" : "";
            SparkPlan plan = this.executeAndKeepPlan("MERGE INTO %s AS t USING %s AS s ON t.id = s.id AND t.dep = s.dep %s WHEN MATCHED THEN   UPDATE SET t.salary = s.salary WHEN NOT MATCHED THEN   INSERT *", new Object[]{this.tableName, this.tableName(OTHER_TABLE_NAME), predicate});
            String planAsString = plan.toString();
            if (mode == RowLevelOperationMode.COPY_ON_WRITE) {
                int actualNumShuffles = StringUtils.countMatches((CharSequence)planAsString, (CharSequence)"Exchange");
                Assert.assertEquals((String)"Should be 1 shuffle with SPJ", (long)1L, (long)actualNumShuffles);
                Assertions.assertThat((String)planAsString).contains(new CharSequence[]{"Exchange hashpartitioning(_file"});
            } else {
                Assertions.assertThat((String)planAsString).doesNotContain(new CharSequence[]{"Exchange"});
            }
        });
        ImmutableList expectedRows = ImmutableList.of((Object)this.row(new Object[]{1, 110, "hr"}), (Object)this.row(new Object[]{2, 200, "hr"}), (Object)this.row(new Object[]{3, 300, "hr"}), (Object)this.row(new Object[]{4, 400, "hardware"}), (Object)this.row(new Object[]{5, 500, "hr"}), (Object)this.row(new Object[]{6, 300, "software"}), (Object)this.row(new Object[]{10, 1000, "ops"}));
        this.assertEquals("Should have expected rows", (List)expectedRows, this.sql("SELECT * FROM %s ORDER BY id, salary", new Object[]{this.tableName}));
    }
}

