/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.MergeIntoAction;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class MergeIntoActionITCase
extends ActionITCaseBase {
    private static final List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}));

    @BeforeEach
    public void setUp() throws Exception {
        ReadWriteTableTestUtil.init(this.warehouse);
        this.prepareTargetTable(CoreOptions.ChangelogProducer.NONE);
        this.prepareSourceTable();
    }

    @ParameterizedTest(name="changelog-producer = {0}")
    @MethodSource(value={"producerTestData"})
    public void testVariousChangelogProducer(CoreOptions.ChangelogProducer producer, List<Row> expected) throws Exception {
        ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE T");
        this.prepareTargetTable(producer);
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        action.withSourceTable("default.S").withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedUpsert("T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'").withMatchedDelete("S.v IS NULL").withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt").withNotMatchedBySourceUpsert("dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'").withNotMatchedBySourceDelete("dt >= '02-28'");
        this.validateActionRunResult(action.build(), expected, Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{2, "v_2_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{3, "v_3_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "matched_upsert", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "v_12", "insert", "02-29"})));
        if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
            this.testWorkWithPartialUpdate();
        }
    }

    private void testWorkWithPartialUpdate() throws Exception {
        ReadWriteTableTestUtil.insertInto("T", "(12, CAST (NULL AS STRING), '$', '02-29')", "(12, 'Test', CAST (NULL AS STRING), '02-29')");
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery("T"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{2, "v_2_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{3, "v_3_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "matched_upsert", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "Test", "$", "02-29"})));
    }

    @ParameterizedTest(name="in-default = {0}")
    @ValueSource(booleans={true, false})
    public void testTargetAlias(boolean inDefault) throws Exception {
        MergeIntoActionBuilder action;
        if (!inDefault) {
            ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE T");
            ReadWriteTableTestUtil.sEnv.executeSql("CREATE DATABASE test_db");
            ReadWriteTableTestUtil.sEnv.executeSql("USE test_db");
            ReadWriteTableTestUtil.bEnv.executeSql("USE test_db");
            this.prepareTargetTable(CoreOptions.ChangelogProducer.NONE);
            action = new MergeIntoActionBuilder(this.warehouse, "test_db", "T");
        } else {
            action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        }
        action.withTargetAlias("TT").withSourceTable("S").withMergeCondition("TT.k = S.k AND TT.dt = S.dt").withMatchedDelete("S.v IS NULL");
        String procedureStatement = String.format("CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k AND TT.dt = S.dt', 'S.v IS NULL')", inDefault ? this.database : "test_db");
        List<Row> streamingExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}));
        List<Row> batchExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}));
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.validateActionRunResult(action.build(), streamingExpected, batchExpected);
        } else {
            this.validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
        }
    }

    @ParameterizedTest(name="in-default = {0}")
    @ValueSource(booleans={true, false})
    public void testSourceName(boolean inDefault) throws Exception {
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, "default", "T");
        String sourceTableName = "S";
        if (!inDefault) {
            ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE S");
            ReadWriteTableTestUtil.sEnv.executeSql("CREATE DATABASE test_db");
            ReadWriteTableTestUtil.sEnv.executeSql("USE test_db");
            ReadWriteTableTestUtil.bEnv.executeSql("USE test_db");
            this.prepareSourceTable();
            sourceTableName = "test_db.S";
        }
        action.withSourceTable(sourceTableName).withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedDelete("S.v IS NULL");
        String procedureStatement = String.format("CALL sys.merge_into('default.T', '', '', '%s', 'T.k = S.k AND T.dt = S.dt', 'S.v IS NULL')", sourceTableName);
        if (!inDefault) {
            ReadWriteTableTestUtil.sEnv.executeSql("USE `default`");
            ReadWriteTableTestUtil.bEnv.executeSql("USE `default`");
        }
        List<Row> streamingExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}));
        List<Row> batchExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}));
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.validateActionRunResult(action.build(), streamingExpected, batchExpected);
        } else {
            this.validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
        }
    }

    @ParameterizedTest(name="useCatalog = {0}")
    @ValueSource(booleans={true, false})
    public void testSqls(boolean useCatalog) throws Exception {
        ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE S");
        String catalog = String.format("CREATE CATALOG test_cat WITH ('type' = 'paimon', 'warehouse' = '%s')", this.getTempDirPath());
        String escapeCatalog = catalog.replaceAll("'", "''");
        String id = TestValuesTableFactory.registerData(Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, null, "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, null, "02-28"})));
        String ddl = String.format("CREATE TEMPORARY TABLE %s (k INT, v STRING, dt STRING)\nWITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", useCatalog ? "S" : "test_cat.`default`.S", id);
        String escapeDdl = ddl.replaceAll("'", "''");
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        if (useCatalog) {
            action.withSourceSqls(catalog, "USE CATALOG test_cat", ddl);
            action.withSourceTable("S");
        } else {
            action.withSourceSqls(catalog, ddl);
            action.withSourceTable("test_cat.default.S");
        }
        action.withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedDelete("S.v IS NULL");
        String procedureStatement = String.format("CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = S.k AND T.dt = S.dt', 'S.v IS NULL')", this.database, useCatalog ? String.format("%s;%s;%s", escapeCatalog, "USE CATALOG test_cat", escapeDdl) : String.format("%s;%s", escapeCatalog, escapeDdl), useCatalog ? "S" : "test_cat.default.S");
        List<Row> streamingExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}));
        List<Row> batchExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}));
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.validateActionRunResult(action.build(), streamingExpected, batchExpected);
        } else {
            this.validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
        }
    }

    @ParameterizedTest(name="source-qualified = {0}")
    @ValueSource(booleans={true, false})
    public void testMatchedUpsertSetAll(boolean qualified) throws Exception {
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S").withSourceTable(qualified ? "default.SS" : "SS").withMergeCondition("T.k = SS.k AND T.dt = SS.dt").withMatchedUpsert(null, "*");
        String procedureStatement = String.format("CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '*')", this.database, "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", qualified ? "default.SS" : "SS");
        List<Row> streamingExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{1, "v_1", "unknown", "02-27"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{4, null, "unknown", "02-27"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "unknown", "02-28"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{8, null, "unknown", "02-28"}));
        List<Row> batchExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{1, "v_1", "unknown", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{4, null, "unknown", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "unknown", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{8, null, "unknown", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}));
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.validateActionRunResult(action.build(), streamingExpected, batchExpected);
        } else {
            this.validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
        }
    }

    @ParameterizedTest(name="source-qualified = {0}")
    @ValueSource(booleans={true, false})
    public void testNotMatchedInsertAll(boolean qualified) throws Exception {
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        action.withSourceSqls("CREATE TEMPORARY VIEW SS AS SELECT k, v, 'unknown', dt FROM S").withSourceTable(qualified ? "default.SS" : "SS").withMergeCondition("T.k = SS.k AND T.dt = SS.dt").withNotMatchedInsert("SS.k < 12", "*");
        String procedureStatement = String.format("CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k AND T.dt = SS.dt', '', '', 'SS.k < 12', '*', '')", this.database, "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'', dt FROM S", qualified ? "default.SS" : "SS");
        List<Row> streamingExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "unknown", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "unknown", "02-29"}));
        List<Row> batchExpected = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "unknown", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "unknown", "02-29"}));
        if (ThreadLocalRandom.current().nextBoolean()) {
            this.validateActionRunResult(action.build(), streamingExpected, batchExpected);
        } else {
            this.validateProcedureResult(procedureStatement, streamingExpected, batchExpected);
        }
    }

    @Test
    public void testProcedureWithDeleteConditionTrue() throws Exception {
        String procedureStatement = String.format("CALL sys.merge_into('%s.T', '', '', 'S', 'T.k = S.k AND T.dt = S.dt', 'TRUE')", this.database);
        this.validateProcedureResult(procedureStatement, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{1, "v_1", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"})), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{10, "v_10", "creation", "02-28"})));
    }

    @Test
    public void testNonPkTable() {
        String nonPkTable = ReadWriteTableTestUtil.createTable(Collections.singletonList("k int"), Collections.emptyList(), Collections.emptyList());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new MergeIntoActionBuilder(this.warehouse, this.database, nonPkTable).build()).isInstanceOf(UnsupportedOperationException.class)).hasMessage("merge-into action doesn't support table with no primary keys defined.");
    }

    @Test
    public void testIncompatibleSchema() {
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, this.database, "T");
        action.withSourceTable("S").withMergeCondition("T.k = S.k AND T.dt = S.dt").withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> action.build().run()).isInstanceOf(IllegalStateException.class)).hasMessage("The schema of result in action 'not-matched-insert' is invalid.\nResult schema:   [INT NOT NULL, STRING, INT NOT NULL, STRING NOT NULL]\nExpected schema: [INT NOT NULL, STRING, STRING, STRING NOT NULL]");
    }

    @Test
    public void testIllegalSourceName() throws Exception {
        ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE S");
        ReadWriteTableTestUtil.sEnv.executeSql("CREATE DATABASE test_db");
        ReadWriteTableTestUtil.sEnv.executeSql("USE test_db");
        this.prepareSourceTable();
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, "default", "T");
        action.withSourceTable("S").withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedDelete("S.v IS NULL");
        Assertions.assertThatThrownBy(() -> action.build().run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(ValidationException.class, (String)"Object 'S' not found")});
    }

    @Test
    public void testIllegalSourceNameSqlCase() {
        ReadWriteTableTestUtil.sEnv.executeSql("DROP TABLE S");
        MergeIntoActionBuilder action = new MergeIntoActionBuilder(this.warehouse, "default", "T");
        action.withSourceSqls("CREATE DATABASE test_db", "CREATE TEMPORARY TABLE test_db.S (k INT, v STRING, dt STRING) WITH ('connector' = 'values', 'bounded' = 'true')").withSourceTable("S").withMergeCondition("T.k = S.k AND T.dt = S.dt").withMatchedDelete("S.v IS NULL");
        Assertions.assertThatThrownBy(() -> action.build().run()).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(ValidationException.class, (String)"Object 'S' not found")});
    }

    private void validateActionRunResult(MergeIntoAction action, List<Row> streamingExpected, List<Row> batchExpected) throws Exception {
        BlockingIterator<Row, Row> iterator = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery("T"), initialRecords);
        action.run();
        ReadWriteTableTestUtil.validateStreamingReadResult(iterator, streamingExpected);
        iterator.close();
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery("T"), batchExpected);
    }

    private void validateProcedureResult(String procedureStatement, List<Row> streamingExpected, List<Row> batchExpected) throws Exception {
        BlockingIterator<Row, Row> iterator = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery("T"), initialRecords);
        this.callProcedure(procedureStatement, true, true);
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery("T"), batchExpected);
        ReadWriteTableTestUtil.validateStreamingReadResult(iterator, streamingExpected);
        iterator.close();
    }

    private void prepareTargetTable(final CoreOptions.ChangelogProducer producer) throws Exception {
        ReadWriteTableTestUtil.sEnv.executeSql(ReadWriteTableTestUtil.buildDdl("T", Arrays.asList("k INT", "v STRING", "last_action STRING", "dt STRING"), Arrays.asList("k", "dt"), Collections.singletonList("dt"), (Map<String, String>)new HashMap<String, String>(){
            {
                this.put(CoreOptions.CHANGELOG_PRODUCER.key(), producer.toString());
                if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
                    this.put(CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
                    this.put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true");
                }
            }
        }));
        ReadWriteTableTestUtil.insertInto("T", "(1, 'v_1', 'creation', '02-27')", "(2, 'v_2', 'creation', '02-27')", "(3, 'v_3', 'creation', '02-27')", "(4, 'v_4', 'creation', '02-27')", "(5, 'v_5', 'creation', '02-28')", "(6, 'v_6', 'creation', '02-28')", "(7, 'v_7', 'creation', '02-28')", "(8, 'v_8', 'creation', '02-28')", "(9, 'v_9', 'creation', '02-28')", "(10, 'v_10', 'creation', '02-28')");
    }

    private void prepareSourceTable() throws Exception {
        ReadWriteTableTestUtil.sEnv.executeSql(ReadWriteTableTestUtil.buildDdl("S", Arrays.asList("k INT", "v STRING", "dt STRING"), Arrays.asList("k", "dt"), Collections.singletonList("dt"), new HashMap<String, String>()));
        ReadWriteTableTestUtil.insertInto("S", "(1, 'v_1', '02-27')", "(4, CAST (NULL AS STRING), '02-27')", "(7, 'Seven', '02-28')", "(8, CAST (NULL AS STRING), '02-28')", "(8, 'v_8', '02-29')", "(11, 'v_11', '02-29')", "(12, 'v_12', '02-29')");
    }

    private static List<Arguments> producerTestData() {
        return Arrays.asList(Arguments.arguments((Object[])new Object[]{CoreOptions.ChangelogProducer.NONE, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "matched_upsert", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "v_12", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{2, "v_2_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{3, "v_3_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{10, "v_10", "creation", "02-28"}))}), Arguments.arguments((Object[])new Object[]{CoreOptions.ChangelogProducer.INPUT, Arrays.asList(TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "matched_upsert", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "v_12", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{2, "v_2_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{3, "v_3_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{10, "v_10", "creation", "02-28"}))}), Arguments.arguments((Object[])new Object[]{CoreOptions.ChangelogProducer.FULL_COMPACTION, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{7, "v_7", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{7, "Seven", "matched_upsert", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{4, "v_4", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{8, "v_8", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{8, "v_8", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{11, "v_11", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{12, "v_12", "insert", "02-29"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{2, "v_2", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{2, "v_2_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{3, "v_3", "creation", "02-27"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{3, "v_3_nmu", "not_matched_upsert", "02-27"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{5, "v_5", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{6, "v_6", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{9, "v_9", "creation", "02-28"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{10, "v_10", "creation", "02-28"}))}));
    }

    private class MergeIntoActionBuilder {
        private final List<String> args;
        private final List<String> mergeActions;

        public MergeIntoActionBuilder(String warehouse, String database, String table) {
            this.args = new ArrayList<String>(Arrays.asList("merge_into", "--warehouse", warehouse, "--database", database, "--table", table));
            this.mergeActions = new ArrayList<String>();
        }

        public MergeIntoActionBuilder withTargetAlias(String targetAlias) {
            if (targetAlias != null) {
                this.args.add("--target_as");
                this.args.add(targetAlias);
            }
            return this;
        }

        public MergeIntoActionBuilder withSourceTable(String sourceTable) {
            this.args.add("--source_table");
            this.args.add(sourceTable);
            return this;
        }

        public MergeIntoActionBuilder withSourceSqls(String ... sourceSqls) {
            if (sourceSqls != null) {
                for (String sql : sourceSqls) {
                    this.args.add("--source_sql");
                    this.args.add(sql);
                }
            }
            return this;
        }

        public MergeIntoActionBuilder withMergeCondition(String mergeCondition) {
            this.args.add("--on");
            this.args.add(mergeCondition);
            return this;
        }

        public MergeIntoActionBuilder withMatchedUpsert(@Nullable String matchedUpsertCondition, String matchedUpsertSet) {
            this.mergeActions.add("matched-upsert");
            this.args.add("--matched_upsert_set");
            this.args.add(matchedUpsertSet);
            if (matchedUpsertCondition != null) {
                this.args.add("--matched_upsert_condition");
                this.args.add(matchedUpsertCondition);
            }
            return this;
        }

        public MergeIntoActionBuilder withNotMatchedBySourceUpsert(@Nullable String notMatchedBySourceUpsertCondition, String notMatchedBySourceUpsertSet) {
            this.mergeActions.add("not-matched-by-source-upsert");
            this.args.add("--not_matched_by_source_upsert_set");
            this.args.add(notMatchedBySourceUpsertSet);
            if (notMatchedBySourceUpsertCondition != null) {
                this.args.add("--not_matched_by_source_upsert_condition");
                this.args.add(notMatchedBySourceUpsertCondition);
            }
            return this;
        }

        public MergeIntoActionBuilder withMatchedDelete(@Nullable String matchedDeleteCondition) {
            this.mergeActions.add("matched-delete");
            if (matchedDeleteCondition != null) {
                this.args.add("--matched_delete_condition");
                this.args.add(matchedDeleteCondition);
            }
            return this;
        }

        public MergeIntoActionBuilder withNotMatchedBySourceDelete(@Nullable String notMatchedBySourceDeleteCondition) {
            this.mergeActions.add("not-matched-by-source-delete");
            if (notMatchedBySourceDeleteCondition != null) {
                this.args.add("--not_matched_by_source_delete_condition");
                this.args.add(notMatchedBySourceDeleteCondition);
            }
            return this;
        }

        public MergeIntoActionBuilder withNotMatchedInsert(@Nullable String notMatchedInsertCondition, String notMatchedInsertValues) {
            this.mergeActions.add("not-matched-insert");
            this.args.add("--not_matched_insert_values");
            this.args.add(notMatchedInsertValues);
            if (notMatchedInsertCondition != null) {
                this.args.add("--not_matched_insert_condition");
                this.args.add(notMatchedInsertCondition);
            }
            return this;
        }

        MergeIntoAction build() {
            this.args.add("--merge_actions");
            this.args.add(String.join((CharSequence)",", this.mergeActions));
            return MergeIntoActionITCase.this.createAction(MergeIntoAction.class, this.args);
        }
    }
}

