/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Set;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpProducer;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpTransferTagGenerator;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpFixedProducer
extends AmqpProducer {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf((String)"amqp:accepted:list");
    private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf((String)"amqp:rejected:list");
    private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true);
    private final Set<Delivery> pending = new LinkedHashSet<Delivery>();
    private final LinkedList<PendingSend> pendingSends = new LinkedList();
    private byte[] encodeBuffer = new byte[8192];
    private boolean presettle = false;

    public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
        super(session, info);
    }

    @Override
    public void close(AsyncResult request) {
        if (!this.pendingSends.isEmpty()) {
            this.closeRequest = request;
            return;
        }
        super.close(request);
    }

    @Override
    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
        if (((Sender)this.getEndpoint()).getCredit() <= 0) {
            LOG.trace("Holding Message send until credit is available.");
            envelope.setSendAsync(false);
            this.pendingSends.addLast(new PendingSend(envelope, request));
            return false;
        }
        this.doSend(envelope, request);
        return true;
    }

    private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
        JmsMessageFacade facade = envelope.getMessage().getFacade();
        LOG.trace("Producer sending message: {}", (Object)envelope);
        byte[] tag = this.tagGenerator.getNextTag();
        Delivery delivery = null;
        delivery = this.presettle ? ((Sender)this.getEndpoint()).delivery(EMPTY_BYTE_ARRAY, 0, 0) : ((Sender)this.getEndpoint()).delivery(tag, 0, tag.length);
        delivery.setContext((Object)request);
        if (this.session.isTransacted()) {
            Binary amqpTxId = this.session.getTransactionContext().getAmqpTransactionId();
            TransactionalState state = new TransactionalState();
            state.setTxnId(amqpTxId);
            delivery.disposition((DeliveryState)state);
        }
        AmqpJmsMessageFacade amqpMessageFacade = (AmqpJmsMessageFacade)facade;
        this.encodeAndSend(amqpMessageFacade.getAmqpMessage(), delivery);
        if (this.presettle) {
            delivery.settle();
        } else {
            this.pending.add(delivery);
            ((Sender)this.getEndpoint()).advance();
        }
        if (envelope.isSendAsync() || this.presettle) {
            request.onSuccess();
        }
    }

    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
        int encodedSize;
        while (true) {
            try {
                encodedSize = message.encode(this.encodeBuffer, 0, this.encodeBuffer.length);
            }
            catch (BufferOverflowException e) {
                this.encodeBuffer = new byte[this.encodeBuffer.length * 2];
                continue;
            }
            break;
        }
        int sentSoFar = 0;
        while (true) {
            int sent;
            if ((sent = ((Sender)this.getEndpoint()).send(this.encodeBuffer, sentSoFar, encodedSize - sentSoFar)) > 0) {
                if (encodedSize - (sentSoFar += sent) != 0) continue;
                break;
            }
            LOG.warn("{} failed to send any data from current Message.", (Object)this);
        }
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws IOException {
        if (!this.pendingSends.isEmpty() && ((Sender)this.getEndpoint()).getCredit() > 0) {
            while (((Sender)this.getEndpoint()).getCredit() > 0 && !this.pendingSends.isEmpty()) {
                LOG.trace("Dispatching previously held send");
                PendingSend held = this.pendingSends.pop();
                try {
                    this.doSend(held.envelope, held.request);
                }
                catch (JMSException e) {
                    throw IOExceptionSupport.create(e);
                }
            }
        }
        if (this.pendingSends.isEmpty() && this.isAwaitingClose()) {
            super.close(this.closeRequest);
        }
        super.processFlowUpdates(provider);
    }

    @Override
    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
        ArrayList<Delivery> toRemove = new ArrayList<Delivery>();
        for (Delivery delivery : this.pending) {
            DeliveryState state = delivery.getRemoteState();
            if (state == null) continue;
            Outcome outcome = null;
            if (state instanceof TransactionalState) {
                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", (Object)state);
                outcome = ((TransactionalState)state).getOutcome();
            } else if (state instanceof Outcome) {
                outcome = (Outcome)state;
            } else {
                LOG.warn("Message send updated with unsupported state: {}", (Object)state);
                continue;
            }
            AsyncResult request = (AsyncResult)delivery.getContext();
            if (outcome instanceof Accepted) {
                toRemove.add(delivery);
                LOG.trace("Outcome of delivery was accepted: {}", (Object)delivery);
                this.tagGenerator.returnTag(delivery.getTag());
                if (request == null || request.isComplete()) continue;
                request.onSuccess();
                continue;
            }
            if (outcome instanceof Rejected) {
                Exception remoteError = this.getRemoteError();
                toRemove.add(delivery);
                LOG.trace("Outcome of delivery was rejected: {}", (Object)delivery);
                this.tagGenerator.returnTag(delivery.getTag());
                if (request != null && !request.isComplete()) {
                    request.onFailure(remoteError);
                    continue;
                }
                this.connection.getProvider().fireProviderException(remoteError);
                continue;
            }
            LOG.warn("Message send updated with unsupported outcome: {}", (Object)outcome);
        }
        this.pending.removeAll(toRemove);
        super.processDeliveryUpdates(provider);
    }

    @Override
    protected void doOpen() {
        JmsDestination destination = ((JmsProducerInfo)this.resource).getDestination();
        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, this.session.getConnection());
        Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL};
        String sourceAddress = this.getProducerId().toString();
        Source source = new Source();
        source.setAddress(sourceAddress);
        source.setOutcomes(outcomes);
        Target target = new Target();
        target.setAddress(targetAddress);
        Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
        if (typeCapability != null) {
            target.setCapabilities(new Symbol[]{typeCapability});
        }
        String senderName = sourceAddress + ":" + targetAddress;
        Sender sender = this.session.getProtonSession().sender(senderName);
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        if (this.presettle) {
            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        this.setEndpoint(sender);
        super.doOpen();
    }

    @Override
    protected void doOpenCompletion() {
        org.apache.qpid.proton.amqp.transport.Target t = ((Sender)this.getEndpoint()).getRemoteTarget();
        if (t != null) {
            super.doOpenCompletion();
        }
    }

    @Override
    protected Exception getOpenAbortException() {
        org.apache.qpid.proton.amqp.transport.Target t = ((Sender)this.getEndpoint()).getRemoteTarget();
        if (t != null) {
            return super.getOpenAbortException();
        }
        return new InvalidDestinationException("Link creation was refused");
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public Sender getProtonSender() {
        return (Sender)this.getEndpoint();
    }

    @Override
    public boolean isAnonymous() {
        return ((JmsProducerInfo)this.resource).getDestination() == null;
    }

    @Override
    public void setPresettle(boolean presettle) {
        this.presettle = presettle;
    }

    @Override
    public boolean isPresettle() {
        return this.presettle;
    }

    public String toString() {
        return "AmqpFixedProducer { " + this.getProducerId() + " }";
    }

    private static class PendingSend {
        public JmsOutboundMessageDispatch envelope;
        public AsyncResult request;

        public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) {
            this.envelope = envelope;
            this.request = request;
        }
    }
}

