/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.spark.extensions;

import java.util.List;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.extensions.ExtensionsTestBase;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;

public class TestPublishChangesProcedure
extends ExtensionsTestBase {
    @AfterEach
    public void removeTables() {
        this.sql("DROP TABLE IF EXISTS %s", new Object[]{this.tableName});
    }

    @TestTemplate
    public void testApplyWapChangesUsingPositionalArgs() {
        String wapId = "wap_id_1";
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.conf().set("spark.wap.id", wapId);
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.assertEquals("Should not see rows from staged snapshot", (List)ImmutableList.of(), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot wapSnapshot = (Snapshot)Iterables.getOnlyElement((Iterable)table.snapshots());
        List output = this.sql("CALL %s.system.publish_changes('%s', '%s')", new Object[]{this.catalogName, this.tableIdent, wapId});
        table.refresh();
        Snapshot currentSnapshot = table.currentSnapshot();
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{wapSnapshot.snapshotId(), currentSnapshot.snapshotId()})), output);
        this.assertEquals("Apply of WAP changes must be successful", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"})), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testApplyWapChangesUsingNamedArgs() {
        String wapId = "wap_id_1";
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        spark.conf().set("spark.wap.id", wapId);
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.assertEquals("Should not see rows from staged snapshot", (List)ImmutableList.of(), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
        Table table = this.validationCatalog.loadTable(this.tableIdent);
        Snapshot wapSnapshot = (Snapshot)Iterables.getOnlyElement((Iterable)table.snapshots());
        List output = this.sql("CALL %s.system.publish_changes(wap_id => '%s', table => '%s')", new Object[]{this.catalogName, wapId, this.tableIdent});
        table.refresh();
        Snapshot currentSnapshot = table.currentSnapshot();
        this.assertEquals("Procedure output must match", (List)ImmutableList.of((Object)this.row(new Object[]{wapSnapshot.snapshotId(), currentSnapshot.snapshotId()})), output);
        this.assertEquals("Apply of WAP changes must be successful", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"})), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
    }

    @TestTemplate
    public void testApplyWapChangesRefreshesRelationCache() {
        String wapId = "wap_id_1";
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        this.sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", new Object[]{this.tableName, "write.wap.enabled"});
        Dataset query = spark.sql("SELECT * FROM " + this.tableName + " WHERE id = 1");
        query.createOrReplaceTempView("tmp");
        spark.sql("CACHE TABLE tmp");
        this.assertEquals("View should not produce rows", (List)ImmutableList.of(), this.sql("SELECT * FROM tmp", new Object[0]));
        spark.conf().set("spark.wap.id", wapId);
        this.sql("INSERT INTO TABLE %s VALUES (1, 'a')", new Object[]{this.tableName});
        this.assertEquals("Should not see rows from staged snapshot", (List)ImmutableList.of(), this.sql("SELECT * FROM %s", new Object[]{this.tableName}));
        this.sql("CALL %s.system.publish_changes('%s', '%s')", new Object[]{this.catalogName, this.tableIdent, wapId});
        this.assertEquals("Apply of WAP changes should be visible", (List)ImmutableList.of((Object)this.row(new Object[]{1L, "a"})), this.sql("SELECT * FROM tmp", new Object[0]));
        this.sql("UNCACHE TABLE tmp", new Object[0]);
    }

    @TestTemplate
    public void testApplyInvalidWapId() {
        this.sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", new Object[]{this.tableName});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.publish_changes('%s', 'not_valid')", new Object[]{this.catalogName, this.tableIdent})).isInstanceOf(ValidationException.class)).hasMessage("Cannot apply unknown WAP ID 'not_valid'");
    }

    @TestTemplate
    public void testInvalidApplyWapChangesCases() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.publish_changes('n', table => 't', 'not_valid')", new Object[]{this.catalogName})).isInstanceOf(AnalysisException.class)).hasMessage("Named and positional arguments cannot be mixed");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", new Object[]{this.catalogName})).isInstanceOf(ParseException.class)).satisfies(new ThrowingConsumer[]{exception -> {
            ParseException parseException = (ParseException)((Object)exception);
            AssertionsForClassTypes.assertThat((String)parseException.getErrorClass()).isEqualTo("PARSE_SYNTAX_ERROR");
            AssertionsForClassTypes.assertThat((String)((String)parseException.getMessageParameters().get("error"))).isEqualTo("'CALL'");
        }});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.publish_changes('t')", new Object[]{this.catalogName})).isInstanceOf(AnalysisException.class)).hasMessage("Missing required parameters: [wap_id]");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.sql("CALL %s.system.publish_changes('', 'not_valid')", new Object[]{this.catalogName})).isInstanceOf(IllegalArgumentException.class)).hasMessage("Cannot handle an empty identifier for argument table");
    }
}

