/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.LinkedHashSet;
import java.util.Set;
import javax.jms.IllegalStateException;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpAbstractResource;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpTransferTagGenerator;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpTransactionContext
extends AmqpAbstractResource<JmsSessionInfo, Sender> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
    private static final Boolean ROLLBACK_MARKER = Boolean.FALSE;
    private static final Boolean COMMIT_MARKER = Boolean.TRUE;
    private final AmqpSession session;
    private JmsTransactionId current;
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator();
    private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>();
    private Delivery pendingDelivery;
    private AsyncResult pendingRequest;

    public AmqpTransactionContext(AmqpSession session) {
        super(session.getJmsResource());
        this.session = session;
    }

    @Override
    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
        try {
            if (this.pendingDelivery != null && this.pendingDelivery.remotelySettled()) {
                DeliveryState state = this.pendingDelivery.getRemoteState();
                if (state instanceof Declared) {
                    Declared declared = (Declared)state;
                    this.current.setProviderHint(declared.getTxnId());
                    this.pendingDelivery.settle();
                    LOG.info("New TX started: {}", this.current.getProviderHint());
                    AsyncResult request = this.pendingRequest;
                    this.pendingRequest = null;
                    this.pendingDelivery = null;
                    request.onSuccess();
                } else if (state instanceof Rejected) {
                    LOG.info("Last TX request failed: {}", this.current.getProviderHint());
                    this.pendingDelivery.settle();
                    Rejected rejected = (Rejected)state;
                    TransactionRolledBackException ex = new TransactionRolledBackException(rejected.getError().getDescription());
                    AsyncResult request = this.pendingRequest;
                    this.current = null;
                    this.pendingRequest = null;
                    this.pendingDelivery = null;
                    this.postRollback();
                    request.onFailure((Throwable)ex);
                } else {
                    LOG.info("Last TX request succeeded: {}", this.current.getProviderHint());
                    this.pendingDelivery.settle();
                    AsyncResult request = this.pendingRequest;
                    if (this.pendingDelivery.getContext() != null) {
                        if (this.pendingDelivery.getContext().equals(COMMIT_MARKER)) {
                            this.postCommit();
                        } else {
                            this.postRollback();
                        }
                    }
                    this.current = null;
                    this.pendingRequest = null;
                    this.pendingDelivery = null;
                    request.onSuccess();
                }
            }
            super.processDeliveryUpdates(provider);
        }
        catch (Exception e) {
            throw IOExceptionSupport.create(e);
        }
    }

    @Override
    protected void doOpen() {
        Coordinator coordinator = new Coordinator();
        coordinator.setCapabilities(new Symbol[]{TxnCapability.LOCAL_TXN});
        Source source = new Source();
        String coordinatorName = ((JmsSessionInfo)this.resource).getSessionId().toString();
        Sender sender = this.session.getProtonSession().sender(coordinatorName);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setTarget((Target)coordinator);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.setEndpoint(sender);
        super.doOpen();
    }

    public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
        if (this.current != null) {
            throw new IOException("Begin called while a TX is still Active.");
        }
        Message message = Message.Factory.create();
        Declare declare = new Declare();
        message.setBody((Section)new AmqpValue((Object)declare));
        this.pendingDelivery = ((Sender)this.getEndpoint()).delivery(this.tagGenerator.getNextTag());
        this.pendingRequest = request;
        this.current = txId;
        this.sendTxCommand(message);
    }

    public void commit(AsyncResult request) throws Exception {
        if (this.current == null) {
            throw new IllegalStateException("Commit called with no active Transaction.");
        }
        this.preCommit();
        Message message = Message.Factory.create();
        Discharge discharge = new Discharge();
        discharge.setFail(Boolean.valueOf(false));
        discharge.setTxnId((Binary)this.current.getProviderHint());
        message.setBody((Section)new AmqpValue((Object)discharge));
        this.pendingDelivery = ((Sender)this.getEndpoint()).delivery(this.tagGenerator.getNextTag());
        this.pendingDelivery.setContext((Object)COMMIT_MARKER);
        this.pendingRequest = request;
        this.sendTxCommand(message);
    }

    public void rollback(AsyncResult request) throws Exception {
        if (this.current == null) {
            throw new IllegalStateException("Rollback called with no active Transaction.");
        }
        this.preRollback();
        Message message = Message.Factory.create();
        Discharge discharge = new Discharge();
        discharge.setFail(Boolean.valueOf(true));
        discharge.setTxnId((Binary)this.current.getProviderHint());
        message.setBody((Section)new AmqpValue((Object)discharge));
        this.pendingDelivery = ((Sender)this.getEndpoint()).delivery(this.tagGenerator.getNextTag());
        this.pendingDelivery.setContext((Object)ROLLBACK_MARKER);
        this.pendingRequest = request;
        this.sendTxCommand(message);
    }

    public void registerTxConsumer(AmqpConsumer consumer) {
        this.txConsumers.add(consumer);
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsTransactionId getTransactionId() {
        return this.current;
    }

    public Binary getAmqpTransactionId() {
        Binary result = null;
        if (this.current != null) {
            result = (Binary)this.current.getProviderHint();
        }
        return result;
    }

    public String toString() {
        return this.session.getSessionId() + ": txContext";
    }

    private void preCommit() throws Exception {
        for (AmqpConsumer consumer : this.txConsumers) {
            consumer.preCommit();
        }
    }

    private void preRollback() throws Exception {
        for (AmqpConsumer consumer : this.txConsumers) {
            consumer.preRollback();
        }
    }

    private void postCommit() throws Exception {
        for (AmqpConsumer consumer : this.txConsumers) {
            consumer.postCommit();
        }
    }

    private void postRollback() throws Exception {
        for (AmqpConsumer consumer : this.txConsumers) {
            consumer.postRollback();
        }
    }

    private void sendTxCommand(Message message) throws IOException {
        int encodedSize = 0;
        byte[] buffer = new byte[4096];
        while (true) {
            try {
                encodedSize = message.encode(buffer, 0, buffer.length);
            }
            catch (BufferOverflowException e) {
                buffer = new byte[buffer.length * 2];
                continue;
            }
            break;
        }
        Sender sender = (Sender)this.getEndpoint();
        sender.send(buffer, 0, encodedSize);
        sender.advance();
    }
}

