/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.amqp.client;

import jakarta.jms.IllegalStateException;
import jakarta.jms.JMSException;
import jakarta.jms.TransactionRolledBackException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.transport.amqp.client.AmqpAbstractResource;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpTransactionId;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpTransactionCoordinator
extends AmqpAbstractResource<Sender> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final byte[] OUTBOUND_BUFFER = new byte[64];
    private final AmqpSession session;
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
    private List<Delivery> pendingDeliveries = new LinkedList<Delivery>();
    private Map<AmqpTransactionId, AsyncResult> pendingRequests = new HashMap<AmqpTransactionId, AsyncResult>();

    public AmqpTransactionCoordinator(AmqpSession session) {
        this.session = session;
    }

    @Override
    public void processDeliveryUpdates(AmqpConnection connection, Delivery delivery) throws IOException {
        try {
            Iterator<Delivery> deliveries = this.pendingDeliveries.iterator();
            while (deliveries.hasNext()) {
                Delivery pendingDelivery = deliveries.next();
                if (!pendingDelivery.remotelySettled()) continue;
                DeliveryState state = pendingDelivery.getRemoteState();
                AmqpTransactionId txId = (AmqpTransactionId)pendingDelivery.getContext();
                AsyncResult pendingRequest = this.pendingRequests.get(txId);
                if (pendingRequest == null) {
                    throw new IllegalStateException("Pending tx operation with no pending request");
                }
                if (state instanceof Declared) {
                    logger.debug("New TX started: {}", (Object)txId.getTxId());
                    Declared declared = (Declared)state;
                    txId.setRemoteTxId(declared.getTxnId());
                    pendingRequest.onSuccess();
                } else if (state instanceof Rejected) {
                    logger.debug("Last TX request failed: {}", (Object)txId.getTxId());
                    Rejected rejected = (Rejected)state;
                    Exception cause = AmqpSupport.convertToException(rejected.getError());
                    Throwable failureCause = null;
                    failureCause = txId.isCommit() ? new TransactionRolledBackException(cause.getMessage()) : new JMSException(cause.getMessage());
                    pendingRequest.onFailure(failureCause);
                } else {
                    logger.debug("Last TX request succeeded: {}", (Object)txId.getTxId());
                    pendingRequest.onSuccess();
                }
                pendingDelivery.settle();
                this.pendingRequests.remove(txId);
                deliveries.remove();
            }
            super.processDeliveryUpdates(connection, delivery);
        }
        catch (Exception e) {
            throw IOExceptionSupport.create(e);
        }
    }

    public void declare(AmqpTransactionId txId, AsyncResult request) throws Exception {
        if (txId.getRemoteTxId() != null) {
            throw new IllegalStateException("Declare called while a TX is still Active.");
        }
        if (this.isClosed()) {
            request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed"));
            return;
        }
        Message message = Message.Factory.create();
        Declare declare = new Declare();
        message.setBody((Section)new AmqpValue((Object)declare));
        Delivery pendingDelivery = ((Sender)this.getEndpoint()).delivery(this.tagGenerator.getNextTag());
        pendingDelivery.setContext((Object)txId);
        this.pendingDeliveries.add(pendingDelivery);
        this.pendingRequests.put(txId, request);
        this.sendTxCommand(message);
    }

    public void discharge(AmqpTransactionId txId, AsyncResult request, boolean commit) throws Exception {
        if (this.isClosed()) {
            Throwable failureCause = null;
            failureCause = commit ? new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed") : new JMSException("Rollback cannot complete: Coordinator remotely closed");
            request.onFailure(failureCause);
            return;
        }
        txId.setState(commit ? 3 : 2);
        Message message = Message.Factory.create();
        Discharge discharge = new Discharge();
        discharge.setFail(Boolean.valueOf(!commit));
        discharge.setTxnId(txId.getRemoteTxId());
        message.setBody((Section)new AmqpValue((Object)discharge));
        Delivery pendingDelivery = ((Sender)this.getEndpoint()).delivery(this.tagGenerator.getNextTag());
        pendingDelivery.setContext((Object)txId);
        this.pendingDeliveries.add(pendingDelivery);
        this.pendingRequests.put(txId, request);
        this.sendTxCommand(message);
    }

    @Override
    public void remotelyClosed(AmqpConnection connection) {
        Exception txnError = AmqpSupport.convertToException(((Sender)this.getEndpoint()).getRemoteCondition());
        for (AsyncResult pendingRequest : this.pendingRequests.values()) {
            pendingRequest.onFailure(txnError);
        }
        this.pendingDeliveries.clear();
        this.pendingRequests.clear();
        if (this.getEndpoint() != null) {
            ((Sender)this.getEndpoint()).close();
            ((Sender)this.getEndpoint()).free();
        }
        logger.debug("Transaction Coordinator link {} was remotely closed", this.getEndpoint());
    }

    private void sendTxCommand(Message message) throws IOException {
        int encodedSize = 0;
        byte[] buffer = this.OUTBOUND_BUFFER;
        while (true) {
            try {
                encodedSize = message.encode(buffer, 0, buffer.length);
            }
            catch (BufferOverflowException e) {
                buffer = new byte[buffer.length * 2];
                continue;
            }
            break;
        }
        Sender sender = (Sender)this.getEndpoint();
        sender.send(buffer, 0, encodedSize);
        sender.advance();
    }

    @Override
    protected void doOpen() {
        Coordinator coordinator = new Coordinator();
        coordinator.setCapabilities(new Symbol[]{TxnCapability.LOCAL_TXN});
        Source source = new Source();
        String coordinatorName = "qpid-jms:coordinator:" + this.session.getConnection().getConnectionId();
        Sender sender = ((Session)this.session.getEndpoint()).sender(coordinatorName);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setTarget((Target)coordinator);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.setEndpoint(sender);
        super.doOpen();
    }

    @Override
    protected void doOpenInspection() {
    }

    @Override
    protected void doClosedInspection() {
    }
}

