/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpProducer;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.AmqpTransferTagGenerator;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpFixedProducer
extends AmqpProducer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
    private final LinkedList<PendingSend> pendingSends = new LinkedList();
    private byte[] encodeBuffer = new byte[8192];
    private boolean presettle = false;

    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
        super(session, info);
    }

    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
        super(session, info, sender);
    }

    @Override
    public void close(AsyncResult request) {
        if (!this.pendingSends.isEmpty()) {
            this.closeRequest = request;
            return;
        }
        super.close(request);
    }

    @Override
    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
        if (((Sender)this.getEndpoint()).getCredit() <= 0) {
            LOG.trace("Holding Message send until credit is available.");
            envelope.setSendAsync(false);
            this.pendingSends.addLast(new PendingSend(envelope, request));
            return false;
        }
        this.doSend(envelope, request);
        return true;
    }

    private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
        if (this.session.isTransacted() && this.session.isTransactionFailed()) {
            request.onSuccess();
            return;
        }
        LOG.trace("Producer sending message: {}", (Object)envelope);
        JmsMessageFacade facade = envelope.getMessage().getFacade();
        boolean presettle = envelope.isPresettle() || this.isPresettle();
        Delivery delivery = null;
        if (presettle) {
            delivery = ((Sender)this.getEndpoint()).delivery(EMPTY_BYTE_ARRAY, 0, 0);
        } else {
            byte[] tag = this.tagGenerator.getNextTag();
            delivery = ((Sender)this.getEndpoint()).delivery(tag, 0, tag.length);
        }
        delivery.setContext((Object)request);
        if (this.session.isTransacted()) {
            Binary amqpTxId = this.session.getTransactionContext().getAmqpTransactionId();
            TransactionalState state = new TransactionalState();
            state.setTxnId(amqpTxId);
            delivery.disposition((DeliveryState)state);
        }
        AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade)facade;
        this.encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery);
        if (presettle) {
            delivery.settle();
        } else {
            this.pending.add(delivery);
            ((Sender)this.getEndpoint()).advance();
        }
        if (envelope.isSendAsync() || presettle) {
            request.onSuccess();
        }
    }

    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
        int encodedSize;
        while (true) {
            try {
                encodedSize = message.encode(this.encodeBuffer, 0, this.encodeBuffer.length);
            }
            catch (BufferOverflowException e) {
                this.encodeBuffer = new byte[this.encodeBuffer.length * 2];
                continue;
            }
            break;
        }
        int sentSoFar = 0;
        while (true) {
            int sent;
            if ((sent = ((Sender)this.getEndpoint()).send(this.encodeBuffer, sentSoFar, encodedSize - sentSoFar)) > 0) {
                if (encodedSize - (sentSoFar += sent) != 0) continue;
                break;
            }
            LOG.warn("{} failed to send any data from current Message.", (Object)this);
        }
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws IOException {
        if (!this.pendingSends.isEmpty() && ((Sender)this.getEndpoint()).getCredit() > 0) {
            while (((Sender)this.getEndpoint()).getCredit() > 0 && !this.pendingSends.isEmpty()) {
                LOG.trace("Dispatching previously held send");
                PendingSend held = this.pendingSends.pop();
                try {
                    this.doSend(held.envelope, held.request);
                }
                catch (JMSException e) {
                    throw IOExceptionSupport.create(e);
                }
            }
        }
        if (this.pendingSends.isEmpty() && this.isAwaitingClose()) {
            super.close(this.closeRequest);
        }
        super.processFlowUpdates(provider);
    }

    @Override
    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
        ArrayList<Delivery> toRemove = new ArrayList<Delivery>();
        for (Delivery delivery : this.pending) {
            DeliveryState state = delivery.getRemoteState();
            if (state == null) continue;
            Outcome outcome = null;
            if (state instanceof TransactionalState) {
                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", (Object)state);
                outcome = ((TransactionalState)state).getOutcome();
            } else if (state instanceof Outcome) {
                outcome = (Outcome)state;
            } else {
                LOG.warn("Message send updated with unsupported state: {}", (Object)state);
                outcome = null;
            }
            AsyncResult request = (AsyncResult)delivery.getContext();
            Throwable deliveryError = null;
            if (outcome instanceof Accepted) {
                LOG.trace("Outcome of delivery was accepted: {}", (Object)delivery);
                if (request != null && !request.isComplete()) {
                    request.onSuccess();
                }
            } else if (outcome instanceof Rejected) {
                LOG.trace("Outcome of delivery was rejected: {}", (Object)delivery);
                ErrorCondition remoteError = ((Rejected)outcome).getError();
                if (remoteError == null) {
                    remoteError = ((Sender)this.getEndpoint()).getRemoteCondition();
                }
                deliveryError = AmqpSupport.convertToException(remoteError);
            } else if (outcome instanceof Released) {
                LOG.trace("Outcome of delivery was released: {}", (Object)delivery);
                deliveryError = new JMSException("Delivery failed: released by receiver");
            } else if (outcome instanceof Modified) {
                LOG.trace("Outcome of delivery was modified: {}", (Object)delivery);
                deliveryError = new JMSException("Delivery failed: failure at remote");
            }
            if (deliveryError != null) {
                if (request != null && !request.isComplete()) {
                    request.onFailure(deliveryError);
                } else {
                    this.connection.getProvider().fireNonFatalProviderException((Exception)deliveryError);
                }
            }
            this.tagGenerator.returnTag(delivery.getTag());
            toRemove.add(delivery);
            delivery.settle();
        }
        this.pending.removeAll(toRemove);
        super.processDeliveryUpdates(provider);
    }

    public AmqpSession getSession() {
        return this.session;
    }

    @Override
    public boolean isAnonymous() {
        return ((JmsProducerInfo)this.getResourceInfo()).getDestination() == null;
    }

    @Override
    public void setPresettle(boolean presettle) {
        this.presettle = presettle;
    }

    @Override
    public boolean isPresettle() {
        return this.presettle;
    }

    public String toString() {
        return "AmqpFixedProducer { " + this.getProducerId() + " }";
    }

    @Override
    public void remotelyClosed(AmqpProvider provider) {
        super.remotelyClosed(provider);
        Throwable ex = AmqpSupport.convertToException(((Sender)this.getEndpoint()).getRemoteCondition());
        if (ex == null) {
            ex = new JMSException("Producer closed remotely before message transfer result was notified");
        }
        for (Delivery delivery : this.pending) {
            try {
                AsyncResult request = (AsyncResult)delivery.getContext();
                if (request != null && !request.isComplete()) {
                    request.onFailure(ex);
                }
                delivery.settle();
                this.tagGenerator.returnTag(delivery.getTag());
            }
            catch (Exception e) {
                LOG.debug("Caught exception when failing pending send during remote producer closure: {}", (Object)delivery, (Object)e);
            }
        }
        this.pending.clear();
    }

    private static class PendingSend {
        public JmsOutboundMessageDispatch envelope;
        public AsyncResult request;

        public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
            this.envelope = envelope;
            this.request = request;
        }
    }
}

