/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSinkAggregatedCommitter
implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
    private static final Logger log = LoggerFactory.getLogger(JdbcSinkAggregatedCommitter.class);
    private final XaFacade xaFacade;
    private final XaGroupOps xaGroupOps;
    private final JdbcSinkConfig jdbcSinkConfig;

    public JdbcSinkAggregatedCommitter(JdbcSinkConfig jdbcSinkConfig) {
        this.xaFacade = XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
        this.xaGroupOps = new XaGroupOpsImpl(this.xaFacade);
        this.jdbcSinkConfig = jdbcSinkConfig;
    }

    private void tryOpen() throws IOException {
        if (!this.xaFacade.isOpen()) {
            try {
                this.xaFacade.open();
            }
            catch (Exception e) {
                throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "unable to open JDBC sink aggregated committer", e);
            }
        }
    }

    public List<JdbcAggregatedCommitInfo> commit(List<JdbcAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
        this.tryOpen();
        return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> {
            log.info("commit xid: " + aggregatedCommitInfo.getXidInfoList());
            GroupXaOperationResult<XidInfo> result = this.xaGroupOps.commit(new ArrayList<XidInfo>(aggregatedCommitInfo.getXidInfoList()), false, this.jdbcSinkConfig.getJdbcConnectionConfig().getMaxCommitAttempts());
            return new JdbcAggregatedCommitInfo(result.getForRetry());
        }).filter(ainfo -> !ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList());
    }

    public JdbcAggregatedCommitInfo combine(List<XidInfo> commitInfos) {
        return new JdbcAggregatedCommitInfo(commitInfos);
    }

    public void abort(List<JdbcAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
        this.tryOpen();
        for (JdbcAggregatedCommitInfo commitInfos : aggregatedCommitInfo) {
            this.xaGroupOps.rollback(commitInfos.getXidInfoList());
        }
    }

    public void close() throws IOException {
        try {
            if (this.xaFacade.isOpen()) {
                this.xaFacade.close();
            }
        }
        catch (Exception e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "unable to close JDBC sink aggregated committer", e);
        }
    }
}

