/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import lombok.NonNull;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;

public class BufferReducedBatchStatementExecutor
implements JdbcBatchStatementExecutor<SeaTunnelRow> {
    @NonNull
    private final JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor;
    @NonNull
    private final JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor;
    @NonNull
    private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
    @NonNull
    private final Function<SeaTunnelRow, SeaTunnelRow> valueTransform;
    @NonNull
    private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> buffer = new LinkedHashMap();

    @Override
    public void prepareStatements(Connection connection) throws SQLException {
        this.upsertExecutor.prepareStatements(connection);
        this.deleteExecutor.prepareStatements(connection);
    }

    @Override
    public void addToBatch(SeaTunnelRow record) throws SQLException {
        if (RowKind.UPDATE_BEFORE.equals((Object)record.getRowKind())) {
            return;
        }
        SeaTunnelRow key = this.keyExtractor.apply(record);
        boolean changeFlag = this.changeFlag(record.getRowKind());
        SeaTunnelRow value = this.valueTransform.apply(record);
        this.buffer.put(key, Pair.of(changeFlag, value));
    }

    @Override
    public void executeBatch() throws SQLException {
        Boolean preChangeFlag = null;
        Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet = this.buffer.entrySet();
        for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry : entrySet) {
            Boolean currentChangeFlag = entry.getValue().getKey();
            if (currentChangeFlag.booleanValue()) {
                if (preChangeFlag != null && !preChangeFlag.booleanValue()) {
                    this.deleteExecutor.executeBatch();
                }
                this.upsertExecutor.addToBatch(entry.getValue().getValue());
            } else {
                if (preChangeFlag != null && preChangeFlag.booleanValue()) {
                    this.upsertExecutor.executeBatch();
                }
                this.deleteExecutor.addToBatch(entry.getKey());
            }
            preChangeFlag = currentChangeFlag;
        }
        if (preChangeFlag != null) {
            if (preChangeFlag.booleanValue()) {
                this.upsertExecutor.executeBatch();
            } else {
                this.deleteExecutor.executeBatch();
            }
        }
        this.buffer.clear();
    }

    @Override
    public void closeStatements() throws SQLException {
        if (!this.buffer.isEmpty()) {
            this.executeBatch();
        }
        this.upsertExecutor.closeStatements();
        this.deleteExecutor.closeStatements();
    }

    private boolean changeFlag(RowKind rowKind) {
        switch (rowKind) {
            case INSERT: 
            case UPDATE_AFTER: {
                return true;
            }
            case DELETE: 
            case UPDATE_BEFORE: {
                return false;
            }
        }
        throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Unsupported rowKind: " + rowKind);
    }

    public BufferReducedBatchStatementExecutor(@NonNull JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor, @NonNull JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor, @NonNull Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, @NonNull Function<SeaTunnelRow, SeaTunnelRow> valueTransform) {
        if (upsertExecutor == null) {
            throw new NullPointerException("upsertExecutor is marked non-null but is null");
        }
        if (deleteExecutor == null) {
            throw new NullPointerException("deleteExecutor is marked non-null but is null");
        }
        if (keyExtractor == null) {
            throw new NullPointerException("keyExtractor is marked non-null but is null");
        }
        if (valueTransform == null) {
            throw new NullPointerException("valueTransform is marked non-null but is null");
        }
        this.upsertExecutor = upsertExecutor;
        this.deleteExecutor = deleteExecutor;
        this.keyExtractor = keyExtractor;
        this.valueTransform = valueTransform;
    }
}

