/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.transaction.xa.Xid;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcExactlyOnceSinkWriter
implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class);
    private final SinkWriter.Context sinkcontext;
    private final JobContext context;
    private final List<JdbcSinkState> recoverStates;
    private final XaFacade xaFacade;
    private final XaGroupOps xaGroupOps;
    private final XidGenerator xidGenerator;
    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
    private transient boolean isOpen;
    private transient Xid currentXid;
    private transient Xid prepareXid;

    public JdbcExactlyOnceSinkWriter(SinkWriter.Context sinkcontext, JobContext context, JdbcDialect dialect, JdbcSinkOptions jdbcSinkOptions, SeaTunnelRowType rowType, List<JdbcSinkState> states) {
        Preconditions.checkArgument(jdbcSinkOptions.getJdbcConnectionOptions().getMaxRetries() == 0, "JDBC XA sink requires maxRetries equal to 0, otherwise it could cause duplicates.");
        this.context = context;
        this.sinkcontext = sinkcontext;
        this.recoverStates = states;
        this.xidGenerator = XidGenerator.semanticXidGenerator();
        Preconditions.checkState(jdbcSinkOptions.isExactlyOnce(), "is_exactly_once config error");
        this.xaFacade = XaFacade.fromJdbcConnectionOptions(jdbcSinkOptions.getJdbcConnectionOptions());
        this.outputFormat = new JdbcOutputFormatBuilder(dialect, this.xaFacade, jdbcSinkOptions, rowType).build();
        this.xaGroupOps = new XaGroupOpsImpl(this.xaFacade);
    }

    private void tryOpen() {
        if (!this.isOpen) {
            this.isOpen = true;
            try {
                this.xidGenerator.open();
                this.xaFacade.open();
                this.outputFormat.open();
                if (!this.recoverStates.isEmpty()) {
                    Xid xid = this.recoverStates.get(0).getXid();
                    this.xaGroupOps.recoverAndRollback(this.context, this.sinkcontext, this.xidGenerator, xid);
                }
                this.beginTx();
            }
            catch (Exception e) {
                throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "unable to open JDBC exactly one writer", e);
            }
        }
    }

    public List<JdbcSinkState> snapshotState(long checkpointId) {
        Preconditions.checkState(this.prepareXid != null, "prepare xid must not be null");
        return Collections.singletonList(new JdbcSinkState(this.prepareXid));
    }

    public void write(SeaTunnelRow element) {
        this.tryOpen();
        Preconditions.checkState(this.currentXid != null, "current xid must not be null");
        SeaTunnelRow copy = SerializationUtils.clone(element);
        this.outputFormat.writeRecord(copy);
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        this.tryOpen();
        this.prepareCurrentTx();
        this.currentXid = null;
        this.beginTx();
        Preconditions.checkState(this.prepareXid != null, "prepare xid must not be null");
        return Optional.of(new XidInfo(this.prepareXid, 0));
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        if (this.currentXid != null && this.xaFacade.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", (Object)this.currentXid);
                this.xaFacade.failAndRollback(this.currentXid);
            }
            catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", (Object)this.currentXid, (Object)e);
            }
        }
        try {
            this.xaFacade.close();
        }
        catch (Exception e) {
            throw new JdbcConnectorException((SeaTunnelErrorCode)CommonErrorCode.WRITER_OPERATION_FAILED, "unable to close JDBC exactly one writer", e);
        }
        this.xidGenerator.close();
        this.currentXid = null;
        this.prepareXid = null;
    }

    private void beginTx() throws IOException {
        Preconditions.checkState(this.currentXid == null, "currentXid not null");
        this.currentXid = this.xidGenerator.generateXid(this.context, this.sinkcontext, System.currentTimeMillis());
        try {
            this.xaFacade.start(this.currentXid);
        }
        catch (Exception e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.XA_OPERATION_FAILED, "unable to start xa transaction", e);
        }
    }

    private void prepareCurrentTx() throws IOException {
        Preconditions.checkState(this.currentXid != null, "no current xid");
        this.outputFormat.flush();
        try {
            this.xaFacade.endAndPrepare(this.currentXid);
            this.prepareXid = this.currentXid;
        }
        catch (Exception e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.XA_OPERATION_FAILED, "unable to prepare current xa transaction", e);
        }
    }
}

