/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeIntoAction
extends TableActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class);
    private final List<String> primaryKeys;
    private final List<DataStructureConverter<Object, Object>> converters;
    private final List<String> targetFieldNames;
    @Nullable
    private String targetAlias;
    private String sourceTable;
    @Nullable
    private String[] sourceSqls;
    private String mergeCondition;
    boolean matchedUpsert;
    boolean notMatchedUpsert;
    boolean matchedDelete;
    boolean notMatchedDelete;
    boolean insert;
    @Nullable
    String matchedUpsertCondition;
    @Nullable
    private String matchedUpsertSet;
    @Nullable
    String notMatchedBySourceUpsertCondition;
    @Nullable
    String notMatchedBySourceUpsertSet;
    @Nullable
    String matchedDeleteCondition;
    @Nullable
    String notMatchedBySourceDeleteCondition;
    @Nullable
    private String notMatchedInsertCondition;
    @Nullable
    private String notMatchedInsertValues;

    public MergeIntoAction(String warehouse, String database, String tableName) {
        this(warehouse, database, tableName, Collections.emptyMap());
    }

    public MergeIntoAction(String warehouse, String database, String tableName, Map<String, String> catalogConfig) {
        super(warehouse, database, tableName, catalogConfig);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports merge-into action. The table type is '%s'.", this.table.getClass().getName()));
        }
        this.changeIgnoreMergeEngine();
        this.primaryKeys = ((FileStoreTable)this.table).schema().primaryKeys();
        if (this.primaryKeys.isEmpty()) {
            throw new UnsupportedOperationException("merge-into action doesn't support table with no primary keys defined.");
        }
        this.converters = this.table.rowType().getFieldTypes().stream().map(LogicalTypeConversion::toLogicalType).map(TypeConversions::fromLogicalToDataType).map(DataStructureConverters::getConverter).collect(Collectors.toList());
        this.targetFieldNames = this.table.rowType().getFields().stream().map(DataField::name).collect(Collectors.toList());
    }

    public MergeIntoAction withTargetAlias(String targetAlias) {
        this.targetAlias = targetAlias;
        return this;
    }

    public MergeIntoAction withSourceTable(String sourceTable) {
        this.sourceTable = sourceTable;
        return this;
    }

    public MergeIntoAction withSourceSqls(String ... sourceSqls) {
        this.sourceSqls = sourceSqls;
        return this;
    }

    public MergeIntoAction withMergeCondition(String mergeCondition) {
        this.mergeCondition = mergeCondition;
        return this;
    }

    public MergeIntoAction withMatchedUpsert(@Nullable String matchedUpsertCondition, String matchedUpsertSet) {
        this.matchedUpsert = true;
        this.matchedUpsertCondition = matchedUpsertCondition;
        this.matchedUpsertSet = matchedUpsertSet;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceUpsert(@Nullable String notMatchedBySourceUpsertCondition, String notMatchedBySourceUpsertSet) {
        this.notMatchedUpsert = true;
        this.notMatchedBySourceUpsertCondition = notMatchedBySourceUpsertCondition;
        this.notMatchedBySourceUpsertSet = notMatchedBySourceUpsertSet;
        return this;
    }

    public MergeIntoAction withMatchedDelete(@Nullable String matchedDeleteCondition) {
        this.matchedDelete = true;
        this.matchedDeleteCondition = matchedDeleteCondition;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceDelete(@Nullable String notMatchedBySourceDeleteCondition) {
        this.notMatchedDelete = true;
        this.notMatchedBySourceDeleteCondition = notMatchedBySourceDeleteCondition;
        return this;
    }

    public MergeIntoAction withNotMatchedInsert(@Nullable String notMatchedInsertCondition, String notMatchedInsertValues) {
        this.insert = true;
        this.notMatchedInsertCondition = notMatchedInsertCondition;
        this.notMatchedInsertValues = notMatchedInsertValues;
        return this;
    }

    @Override
    public void run() throws Exception {
        this.handleTargetAlias();
        this.handleSqls();
        List dataStreams = Stream.of(this.getMatchedUpsertDataStream(), this.getNotMatchedUpsertDataStream(), this.getMatchedDeleteDataStream(), this.getNotMatchedDeleteDataStream(), this.getInsertDataStream()).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        DataStream firstDs = (DataStream)dataStreams.get(0);
        this.batchSink((DataStream<RowData>)firstDs.union((DataStream[])dataStreams.stream().skip(1L).toArray(DataStream[]::new)));
    }

    private void handleTargetAlias() {
        if (this.targetAlias != null) {
            this.batchTEnv.createTemporaryView(this.escapedTargetName(), this.batchTEnv.from(this.identifier.getFullName()));
        }
    }

    private void handleSqls() {
        if (this.sourceSqls != null) {
            for (String sql : this.sourceSqls) {
                try {
                    this.batchTEnv.executeSql(sql).await();
                }
                catch (Throwable t) {
                    String errMsg = "Error occurs when executing sql:\n%s";
                    LOG.error(String.format(errMsg, sql), t);
                    throw new RuntimeException(String.format(errMsg, sql), t);
                }
            }
        }
    }

    private Optional<DataStream<RowData>> getMatchedUpsertDataStream() {
        List<String> project;
        if (!this.matchedUpsert) {
            return Optional.empty();
        }
        if (this.matchedUpsertSet.equals("*")) {
            String[] splits = this.sourceTable.split("\\.");
            project = Collections.singletonList(splits[splits.length - 1] + ".*");
        } else {
            Map<String, String> changes = ActionFactory.parseCommaSeparatedKeyValues(this.matchedUpsertSet);
            for (String targetField : changes.keySet()) {
                if (this.targetFieldNames.contains(targetField)) continue;
                throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at matched-upsert action.", targetField, this.identifier.getFullName()));
            }
            project = this.targetFieldNames.stream().map(name -> changes.getOrDefault(name, this.targetTableName() + "." + name)).collect(Collectors.toList());
        }
        String query = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", String.join((CharSequence)",", project), this.escapedTargetName(), this.escapedSourceName(), this.mergeCondition, this.matchedUpsertCondition == null ? "" : "WHERE " + this.matchedUpsertCondition);
        LOG.info("Query used for matched-update:\n{}", (Object)query);
        Table source = this.batchTEnv.sqlQuery(query);
        this.checkSchema("matched-upsert", source);
        return Optional.of(this.toDataStream(source, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() {
        if (!this.notMatchedUpsert) {
            return Optional.empty();
        }
        Map<String, String> changes = ActionFactory.parseCommaSeparatedKeyValues(this.notMatchedBySourceUpsertSet);
        for (String targetField : changes.keySet()) {
            if (!this.targetFieldNames.contains(targetField)) {
                throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at not-matched-by-source-upsert action.\nRun <action> --help for help.", targetField, this.identifier.getFullName()));
            }
            if (!this.primaryKeys.contains(targetField)) continue;
            throw new RuntimeException("Not allowed to change primary key in not-matched-by-source-upsert-set.\nRun <action> --help for help.");
        }
        List project = this.targetFieldNames.stream().map(name -> changes.getOrDefault(name, (String)name)).collect(Collectors.toList());
        String query = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", String.join((CharSequence)",", project), this.escapedTargetName(), this.escapedSourceName(), this.mergeCondition, this.notMatchedBySourceUpsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceUpsertCondition));
        LOG.info("Query used for not-matched-by-source-upsert:\n{}", (Object)query);
        Table source = this.batchTEnv.sqlQuery(query);
        this.checkSchema("not-matched-by-source-upsert", source);
        return Optional.of(this.toDataStream(source, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getMatchedDeleteDataStream() {
        if (!this.matchedDelete) {
            return Optional.empty();
        }
        List project = this.targetFieldNames.stream().map(name -> this.targetTableName() + "." + name).collect(Collectors.toList());
        String query = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", String.join((CharSequence)",", project), this.escapedTargetName(), this.escapedSourceName(), this.mergeCondition, this.matchedDeleteCondition == null ? "" : "WHERE " + this.matchedDeleteCondition);
        LOG.info("Query used by matched-delete:\n{}", (Object)query);
        Table source = this.batchTEnv.sqlQuery(query);
        this.checkSchema("matched-delete", source);
        return Optional.of(this.toDataStream(source, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() {
        if (!this.notMatchedDelete) {
            return Optional.empty();
        }
        String query = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", String.join((CharSequence)",", this.targetFieldNames), this.escapedTargetName(), this.escapedSourceName(), this.mergeCondition, this.notMatchedBySourceDeleteCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceDeleteCondition));
        LOG.info("Query used by not-matched-by-source-delete:\n{}", (Object)query);
        Table source = this.batchTEnv.sqlQuery(query);
        this.checkSchema("not-matched-by-source-delete", source);
        return Optional.of(this.toDataStream(source, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getInsertDataStream() {
        if (!this.insert) {
            return Optional.empty();
        }
        String query = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", this.notMatchedInsertValues, this.escapedSourceName(), this.escapedTargetName(), this.mergeCondition, this.notMatchedInsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedInsertCondition));
        LOG.info("Query used by not-matched-insert:\n{}", (Object)query);
        Table source = this.batchTEnv.sqlQuery(query);
        this.checkSchema("not-matched-insert", source);
        return Optional.of(this.toDataStream(source, RowKind.INSERT, this.converters));
    }

    private void checkSchema(String action, Table source) {
        List<DataType> expectedTypes;
        List<DataType> actualTypes = this.toPaimonTypes(source.getResolvedSchema().getColumnDataTypes());
        if (!this.compatibleCheck(actualTypes, expectedTypes = this.table.rowType().getFieldTypes())) {
            throw new IllegalStateException(String.format("The schema of result in action '%s' is invalid.\nResult schema:   [%s]\nExpected schema: [%s]", action, actualTypes.stream().map(DataType::asSQLString).collect(Collectors.joining(", ")), expectedTypes.stream().map(DataType::asSQLString).collect(Collectors.joining(", "))));
        }
    }

    private DataStream<RowData> toDataStream(Table source, RowKind kind, List<DataStructureConverter<Object, Object>> converters) {
        return this.batchTEnv.toChangelogStream(source).map((MapFunction & Serializable)row -> {
            int arity = row.getArity();
            GenericRowData rowData = new GenericRowData(kind, arity);
            for (int i = 0; i < arity; ++i) {
                rowData.setField(i, ((DataStructureConverter)converters.get(i)).toInternalOrNull(row.getField(i)));
            }
            return rowData;
        });
    }

    private String targetTableName() {
        return this.targetAlias == null ? this.identifier.getObjectName() : this.targetAlias;
    }

    private String escapedTargetName() {
        return String.format("`%s`.`%s`.`%s`", this.catalogName, this.identifier.getDatabaseName(), this.targetTableName());
    }

    private String escapedSourceName() {
        return Arrays.stream(this.sourceTable.split("\\.")).map(s -> String.format("`%s`", s)).collect(Collectors.joining("."));
    }
}

