/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.util.List;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.extensions.ExtensionsTestBase;
import org.apache.iceberg.spark.extensions.SparkPlanUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.expressions.ApplyFunctionExpression;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
import org.apache.spark.sql.execution.CommandResultExec;
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class TestSystemFunctionPushDownInRowLevelOperations
extends ExtensionsTestBase {
    private static final String CHANGES_TABLE_NAME = "changes";

    @Parameters(name="catalogName = {0}, implementation = {1}, config = {2}")
    public static Object[][] parameters() {
        return new Object[][]{{SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), SparkCatalogConfig.HIVE.properties()}};
    }

    @BeforeEach
    public void beforeEach() {
        this.sql("USE %s", new Object[]{this.catalogName});
    }

    @AfterEach
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s PURGE", new Object[]{this.tableName});
        this.sql("DROP TABLE IF EXISTS %s PURGE", new Object[]{this.tableName(CHANGES_TABLE_NAME)});
    }

    @TestTemplate
    public void testCopyOnWriteDeleteBucketTransformInPredicate() {
        this.initTable("bucket(4, dep)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.bucket(4, dep) IN (2, 3)");
    }

    @TestTemplate
    public void testMergeOnReadDeleteBucketTransformInPredicate() {
        this.initTable("bucket(4, dep)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.bucket(4, dep) IN (2, 3)");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteBucketTransformEqPredicate() {
        this.initTable("bucket(4, dep)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.bucket(4, dep) = 2");
    }

    @TestTemplate
    public void testMergeOnReadDeleteBucketTransformEqPredicate() {
        this.initTable("bucket(4, dep)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.bucket(4, dep) = 2");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteYearsTransform() {
        this.initTable("years(ts)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.years(ts) > 30");
    }

    @TestTemplate
    public void testMergeOnReadDeleteYearsTransform() {
        this.initTable("years(ts)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.years(ts) <= 30");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteMonthsTransform() {
        this.initTable("months(ts)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.months(ts) <= 250");
    }

    @TestTemplate
    public void testMergeOnReadDeleteMonthsTransform() {
        this.initTable("months(ts)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.months(ts) > 250");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteDaysTransform() {
        this.initTable("days(ts)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.days(ts) <= date('2000-01-03 00:00:00')");
    }

    @TestTemplate
    public void testMergeOnReadDeleteDaysTransform() {
        this.initTable("days(ts)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.days(ts) > date('2000-01-03 00:00:00')");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteHoursTransform() {
        this.initTable("hours(ts)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.hours(ts) <= 100000");
    }

    @TestTemplate
    public void testMergeOnReadDeleteHoursTransform() {
        this.initTable("hours(ts)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.hours(ts) > 100000");
    }

    @TestTemplate
    public void testCopyOnWriteDeleteTruncateTransform() {
        this.initTable("truncate(1, dep)");
        this.checkDelete(RowLevelOperationMode.COPY_ON_WRITE, "system.truncate(1, dep) = 'i'");
    }

    @TestTemplate
    public void testMergeOnReadDeleteTruncateTransform() {
        this.initTable("truncate(1, dep)");
        this.checkDelete(RowLevelOperationMode.MERGE_ON_READ, "system.truncate(1, dep) = 'i'");
    }

    @TestTemplate
    public void testCopyOnWriteUpdateBucketTransform() {
        this.initTable("bucket(4, dep)");
        this.checkUpdate(RowLevelOperationMode.COPY_ON_WRITE, "system.bucket(4, dep) IN (2, 3)");
    }

    @TestTemplate
    public void testMergeOnReadUpdateBucketTransform() {
        this.initTable("bucket(4, dep)");
        this.checkUpdate(RowLevelOperationMode.MERGE_ON_READ, "system.bucket(4, dep) = 2");
    }

    @TestTemplate
    public void testCopyOnWriteUpdateYearsTransform() {
        this.initTable("years(ts)");
        this.checkUpdate(RowLevelOperationMode.COPY_ON_WRITE, "system.years(ts) > 30");
    }

    @TestTemplate
    public void testMergeOnReadUpdateYearsTransform() {
        this.initTable("years(ts)");
        this.checkUpdate(RowLevelOperationMode.MERGE_ON_READ, "system.years(ts) <= 30");
    }

    @TestTemplate
    public void testCopyOnWriteMergeBucketTransform() {
        this.initTable("bucket(4, dep)");
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE, "system.bucket(4, dep) IN (2, 3)");
    }

    @TestTemplate
    public void testMergeOnReadMergeBucketTransform() {
        this.initTable("bucket(4, dep)");
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ, "system.bucket(4, dep) = 2");
    }

    @TestTemplate
    public void testCopyOnWriteMergeYearsTransform() {
        this.initTable("years(ts)");
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE, "system.years(ts) > 30");
    }

    @TestTemplate
    public void testMergeOnReadMergeYearsTransform() {
        this.initTable("years(ts)");
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ, "system.years(ts) <= 30");
    }

    @TestTemplate
    public void testCopyOnWriteMergeTruncateTransform() {
        this.initTable("truncate(1, dep)");
        this.checkMerge(RowLevelOperationMode.COPY_ON_WRITE, "system.truncate(1, dep) = 'i'");
    }

    @TestTemplate
    public void testMergeOnReadMergeTruncateTransform() {
        this.initTable("truncate(1, dep)");
        this.checkMerge(RowLevelOperationMode.MERGE_ON_READ, "system.truncate(1, dep) = 'i'");
    }

    private void checkDelete(RowLevelOperationMode mode, String cond) {
        this.withUnavailableLocations(this.findIrrelevantFileLocations(cond), () -> {
            this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')", new Object[]{this.tableName, "write.delete.mode", mode.modeName(), "write.delete.distribution-mode", DistributionMode.NONE.modeName()});
            Dataset changeDF = spark.table(this.tableName).where(cond).limit(2).select("id", new String[0]);
            changeDF.coalesce(1).writeTo(this.tableName(CHANGES_TABLE_NAME)).create();
            List<Expression> calls = this.executeAndCollectFunctionCalls("DELETE FROM %s t WHERE %s AND t.id IN (SELECT id FROM %s)", this.tableName, cond, this.tableName(CHANGES_TABLE_NAME));
            int expectedCallCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 1 : 0;
            Assertions.assertThat(calls).hasSize(expectedCallCount);
            this.assertEquals("Should have no matching rows", (List)ImmutableList.of(), this.sql("SELECT * FROM %s WHERE %s AND id IN (SELECT * FROM %s)", new Object[]{this.tableName, cond, this.tableName(CHANGES_TABLE_NAME)}));
        });
    }

    private void checkUpdate(RowLevelOperationMode mode, String cond) {
        this.withUnavailableLocations(this.findIrrelevantFileLocations(cond), () -> {
            this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')", new Object[]{this.tableName, "write.update.mode", mode.modeName(), "write.update.distribution-mode", DistributionMode.NONE.modeName()});
            Dataset changeDF = spark.table(this.tableName).where(cond).limit(2).select("id", new String[0]);
            changeDF.coalesce(1).writeTo(this.tableName(CHANGES_TABLE_NAME)).create();
            List<Expression> calls = this.executeAndCollectFunctionCalls("UPDATE %s t SET t.salary = -1 WHERE %s AND t.id IN (SELECT id FROM %s)", this.tableName, cond, this.tableName(CHANGES_TABLE_NAME));
            int expectedCallCount = mode == RowLevelOperationMode.COPY_ON_WRITE ? 2 : 0;
            Assertions.assertThat(calls).hasSize(expectedCallCount);
            this.assertEquals("Should have correct updates", this.sql("SELECT id FROM %s", new Object[]{this.tableName(CHANGES_TABLE_NAME)}), this.sql("SELECT id FROM %s WHERE %s AND salary = -1", new Object[]{this.tableName, cond}));
        });
    }

    private void checkMerge(RowLevelOperationMode mode, String cond) {
        this.withUnavailableLocations(this.findIrrelevantFileLocations(cond), () -> {
            this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')", new Object[]{this.tableName, "write.merge.mode", mode.modeName(), "write.merge.distribution-mode", DistributionMode.NONE.modeName()});
            Dataset changeDF = spark.table(this.tableName).where(cond).limit(2).selectExpr(new String[]{"id + 1 as id"});
            changeDF.coalesce(1).writeTo(this.tableName(CHANGES_TABLE_NAME)).create();
            List<Expression> calls = this.executeAndCollectFunctionCalls("MERGE INTO %s t USING %s s ON t.id == s.id AND %s WHEN MATCHED THEN   UPDATE SET salary = -1 WHEN NOT MATCHED AND s.id = 2 THEN   INSERT (id, salary, dep, ts) VALUES (100, -1, 'hr', null)", this.tableName, this.tableName(CHANGES_TABLE_NAME), cond);
            Assertions.assertThat(calls).isEmpty();
            this.assertEquals("Should have correct updates", this.sql("SELECT id FROM %s", new Object[]{this.tableName(CHANGES_TABLE_NAME)}), this.sql("SELECT id FROM %s WHERE %s AND salary = -1", new Object[]{this.tableName, cond}));
        });
    }

    private List<Expression> executeAndCollectFunctionCalls(String query, Object ... args) {
        CommandResultExec command = (CommandResultExec)this.executeAndKeepPlan(query, args);
        V2TableWriteExec write = (V2TableWriteExec)command.commandPhysicalPlan();
        return SparkPlanUtil.collectExprs(write.query(), expr -> expr instanceof StaticInvoke || expr instanceof ApplyFunctionExpression);
    }

    private List<String> findIrrelevantFileLocations(String cond) {
        return spark.table(this.tableName).where("NOT " + cond).select(MetadataColumns.FILE_PATH.name(), new String[0]).distinct().as(Encoders.STRING()).collectAsList();
    }

    private void initTable(String transform) {
        this.sql("CREATE TABLE %s (id BIGINT, salary INT, dep STRING, ts TIMESTAMP)USING iceberg PARTITIONED BY (%s)", new Object[]{this.tableName, transform});
        this.append(this.tableName, new String[]{"{ \"id\": 1, \"salary\": 100, \"dep\": \"hr\", \"ts\": \"1975-01-01 06:00:00\" }", "{ \"id\": 2, \"salary\": 200, \"dep\": \"hr\", \"ts\": \"1975-01-01 06:00:00\" }", "{ \"id\": 3, \"salary\": 300, \"dep\": \"hr\", \"ts\": \"1975-01-01 06:00:00\" }", "{ \"id\": 4, \"salary\": 400, \"dep\": \"it\", \"ts\": \"2020-01-01 10:00:00\" }", "{ \"id\": 5, \"salary\": 500, \"dep\": \"it\", \"ts\": \"2020-01-01 10:00:00\" }", "{ \"id\": 6, \"salary\": 600, \"dep\": \"it\", \"ts\": \"2020-01-01 10:00:00\" }"});
    }
}

