/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.provider.amqp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.WrappedAsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpAbstractResource;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.message.AmqpCodec;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpConsumer
extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
    private static final int INITIAL_BUFFER_CAPACITY = 131072;
    protected final AmqpSession session;
    protected AsyncResult stopRequest;
    protected AsyncResult pullRequest;
    protected final ByteBuf incomingBuffer = Unpooled.buffer((int)131072);
    protected long incomingSequence;
    protected long deliveredCount;
    protected boolean deferredClose;

    public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
        super(info, receiver, session);
        this.session = session;
    }

    @Override
    public void close(AsyncResult request) {
        if (this.shouldDeferClose()) {
            this.deferredClose = true;
            this.stop(new StopAndReleaseRequest(request));
        } else {
            super.close(request);
        }
    }

    public void start(AsyncResult request) {
        JmsConsumerInfo consumerInfo = (JmsConsumerInfo)this.getResourceInfo();
        if (consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
            this.sendFlowForNoPrefetchListener();
        } else {
            this.sendFlowIfNeeded();
        }
        request.onSuccess();
    }

    public void stop(AsyncResult request) {
        Receiver receiver = (Receiver)this.getEndpoint();
        if (receiver.getRemoteCredit() <= 0) {
            if (receiver.getQueued() == 0) {
                request.onSuccess();
            } else {
                this.stopRequest = request;
            }
        } else {
            this.stopRequest = request;
            receiver.drain(0);
            if (this.getDrainTimeout() > 0) {
                ScheduledFuture<?> future = this.getSession().schedule(new Runnable(){

                    @Override
                    public void run() {
                        LOG.trace("Consumer {} drain request timed out", (Object)AmqpConsumer.this.getConsumerId());
                        JmsOperationTimedOutException cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
                        if (AmqpConsumer.this.session.isTransacted() && AmqpConsumer.this.session.getTransactionContext().isInTransaction(AmqpConsumer.this.getConsumerId())) {
                            AmqpConsumer.this.stopRequest.onFailure((Throwable)((Object)cause));
                            AmqpConsumer.this.stopRequest = null;
                        } else {
                            AmqpConsumer.this.closeResource(AmqpConsumer.this.session.getProvider(), (Throwable)((Object)cause), false);
                            AmqpConsumer.this.session.getProvider().pumpToProtonTransport();
                        }
                    }
                }, this.getDrainTimeout());
                this.stopRequest = new ScheduledRequest(future, this.stopRequest);
            }
        }
    }

    private void stopOnSchedule(long timeout, final AsyncResult request) {
        LOG.trace("Consumer {} scheduling stop", (Object)this.getConsumerId());
        ScheduledFuture<?> future = this.getSession().schedule(new Runnable(){

            @Override
            public void run() {
                LOG.trace("Consumer {} running scheduled stop", (Object)AmqpConsumer.this.getConsumerId());
                if (((Receiver)AmqpConsumer.this.getEndpoint()).getRemoteCredit() != 0) {
                    AmqpConsumer.this.stop(request);
                    AmqpConsumer.this.session.getProvider().pumpToProtonTransport(request);
                }
            }
        }, timeout);
        this.stopRequest = new ScheduledRequest(future, request);
    }

    @Override
    public void processFlowUpdates(AmqpProvider provider) throws IOException {
        Receiver receiver;
        if (this.stopRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        if (this.pullRequest != null && (receiver = (Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && receiver.getQueued() == 0) {
            this.pullRequest.onSuccess();
            this.pullRequest = null;
        }
        LOG.trace("Consumer {} flow updated, remote credit = {}", (Object)this.getConsumerId(), (Object)((Receiver)this.getEndpoint()).getRemoteCredit());
        super.processFlowUpdates(provider);
    }

    public void acknowledge(ProviderConstants.ACK_TYPE ackType) {
        LOG.trace("Session Acknowledge for consumer {} with ack type {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId(), (Object)ackType);
        Delivery delivery = ((Receiver)this.getEndpoint()).head();
        while (delivery != null) {
            Delivery current = delivery;
            delivery = delivery.next();
            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
                LOG.debug("{} Found incomplete delivery with no context during recover processing", (Object)this);
                continue;
            }
            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch)current.getContext();
            if (!envelope.isDelivered()) continue;
            switch (ackType) {
                case ACCEPTED: {
                    current.disposition((DeliveryState)Accepted.getInstance());
                    break;
                }
                case RELEASED: {
                    current.disposition((DeliveryState)Released.getInstance());
                    break;
                }
                case REJECTED: {
                    current.disposition((DeliveryState)AmqpSupport.REJECTED);
                    break;
                }
                case MODIFIED_FAILED: {
                    current.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED);
                    break;
                }
                case MODIFIED_FAILED_UNDELIVERABLE: {
                    current.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + (Object)((Object)ackType));
                }
            }
            current.settle();
            --this.deliveredCount;
        }
        this.tryCompleteDeferredClose();
    }

    public void acknowledge(JmsInboundMessageDispatch envelope, ProviderConstants.ACK_TYPE ackType) throws JMSException {
        Delivery delivery = null;
        if (!(envelope.getProviderHint() instanceof Delivery)) {
            LOG.warn("Received Ack for unknown message: {}", (Object)envelope);
            return;
        }
        delivery = (Delivery)envelope.getProviderHint();
        if (ackType.equals((Object)ProviderConstants.ACK_TYPE.DELIVERED)) {
            LOG.debug("Delivered Ack of message: {}", (Object)envelope);
            ++this.deliveredCount;
            envelope.setDelivered(true);
            delivery.setDefaultDeliveryState((DeliveryState)AmqpSupport.MODIFIED_FAILED);
            this.sendFlowIfNeeded();
            return;
        }
        if (ackType.equals((Object)ProviderConstants.ACK_TYPE.ACCEPTED)) {
            if (!envelope.isDelivered()) {
                this.sendFlowIfNeeded();
            }
            LOG.debug("Accepted Ack of message: {}", (Object)envelope);
            if (!delivery.remotelySettled()) {
                if (this.session.isTransacted() && !((JmsConsumerInfo)this.getResourceInfo()).isBrowser()) {
                    if (this.session.isTransactionFailed()) {
                        LOG.trace("Skipping ack of message {} in failed transaction.", (Object)envelope);
                        return;
                    }
                    Binary txnId = this.session.getTransactionContext().getAmqpTransactionId();
                    if (txnId != null) {
                        delivery.disposition((DeliveryState)this.session.getTransactionContext().getTxnAcceptState());
                        delivery.settle();
                        this.session.getTransactionContext().registerTxConsumer(this);
                    }
                } else {
                    delivery.disposition((DeliveryState)Accepted.getInstance());
                    delivery.settle();
                }
            } else {
                delivery.settle();
            }
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
            this.deliveryFailedUndeliverable(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.EXPIRED)) {
            this.deliveryFailedUndeliverable(delivery);
        } else if (ackType.equals((Object)ProviderConstants.ACK_TYPE.RELEASED)) {
            delivery.disposition((DeliveryState)Released.getInstance());
            delivery.settle();
        } else {
            LOG.warn("Unsupported Ack Type for message: {}", (Object)envelope);
            return;
        }
        if (envelope.isDelivered()) {
            --this.deliveredCount;
        }
        this.tryCompleteDeferredClose();
    }

    private void sendFlowIfNeeded() {
        if (((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() == 0 || this.isStopping()) {
            return;
        }
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if ((double)currentCredit <= (double)((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() * 0.3) {
            int newCredit = ((JmsConsumerInfo)this.getResourceInfo()).getPrefetchSize() - currentCredit;
            LOG.trace("Consumer {} granting additional credit: {}", (Object)this.getConsumerId(), (Object)newCredit);
            ((Receiver)this.getEndpoint()).flow(newCredit);
        }
    }

    private void sendFlowForNoPrefetchListener() {
        int currentCredit = ((Receiver)this.getEndpoint()).getCredit();
        if (currentCredit < 1) {
            int additionalCredit = 1 - currentCredit;
            LOG.trace("Consumer {} granting additional credit: {}", (Object)this.getConsumerId(), (Object)additionalCredit);
            ((Receiver)this.getEndpoint()).flow(additionalCredit);
        }
    }

    public void recover() throws Exception {
        LOG.debug("Session Recover for consumer: {}", (Object)((JmsConsumerInfo)this.getResourceInfo()).getId());
        ArrayList<JmsInboundMessageDispatch> redispatchList = new ArrayList<JmsInboundMessageDispatch>();
        for (Delivery delivery = ((Receiver)this.getEndpoint()).head(); delivery != null; delivery = delivery.next()) {
            Delivery current = delivery;
            if (current.getContext() instanceof JmsInboundMessageDispatch) continue;
            LOG.debug("{} Found incomplete delivery with no context during recover processing", (Object)this);
        }
        ListIterator reverseIterator = redispatchList.listIterator(redispatchList.size());
        while (reverseIterator.hasPrevious()) {
            this.deliver((JmsInboundMessageDispatch)reverseIterator.previous());
        }
    }

    public void pull(long timeout, AsyncResult request) {
        LOG.trace("Pull on consumer {} with timeout = {}", (Object)this.getConsumerId(), (Object)timeout);
        if (timeout < 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.pullRequest = request;
        } else if (timeout == 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stop(request);
        } else if (timeout > 0L) {
            if (((Receiver)this.getEndpoint()).getCredit() == 0) {
                LOG.trace("Consumer {} granting 1 additional credit for pull.", (Object)this.getConsumerId());
                ((Receiver)this.getEndpoint()).flow(1);
            }
            this.stopOnSchedule(timeout, request);
        }
    }

    @Override
    public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
        if (delivery.isReadable() && !delivery.isPartial()) {
            LOG.trace("{} has incoming Message(s).", (Object)this);
            try {
                if (this.processDelivery(delivery) && this.pullRequest != null) {
                    this.pullRequest.onSuccess();
                    this.pullRequest = null;
                }
            }
            catch (Exception e) {
                throw IOExceptionSupport.create(e);
            }
        }
        if (((Receiver)this.getEndpoint()).current() == null && ((Receiver)this.getEndpoint()).getRemoteCredit() <= 0 && this.stopRequest != null) {
            this.stopRequest.onSuccess();
            this.stopRequest = null;
        }
        super.processDeliveryUpdates(provider, delivery);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processDelivery(Delivery incoming) throws Exception {
        incoming.setDefaultDeliveryState((DeliveryState)Released.getInstance());
        JmsMessage message = null;
        try {
            message = AmqpCodec.decodeMessage(this, this.unwrapIncomingMessage(incoming)).asJmsMessage();
        }
        catch (Exception e) {
            LOG.warn("Error on transform: {}", (Object)e.getMessage());
            this.deliveryFailedUndeliverable(incoming);
            return false;
        }
        try {
            message.onDispatch();
            JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(this.getNextIncomingSequenceNumber());
            envelope.setMessage(message);
            envelope.setConsumerId(((JmsConsumerInfo)this.getResourceInfo()).getId());
            envelope.setProviderHint(incoming);
            envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
            incoming.setContext((Object)envelope);
            this.deliver(envelope);
            boolean bl = true;
            return bl;
        }
        finally {
            ((Receiver)this.getEndpoint()).advance();
        }
    }

    protected long getNextIncomingSequenceNumber() {
        return ++this.incomingSequence;
    }

    @Override
    protected void closeOrDetachEndpoint() {
        if (((JmsConsumerInfo)this.getResourceInfo()).isDurable()) {
            ((Receiver)this.getEndpoint()).detach();
        } else {
            ((Receiver)this.getEndpoint()).close();
        }
    }

    public AmqpConnection getConnection() {
        return this.session.getConnection();
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public JmsConsumerId getConsumerId() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getId();
    }

    public JmsDestination getDestination() {
        return ((JmsConsumerInfo)this.getResourceInfo()).getDestination();
    }

    public boolean isStopping() {
        return this.stopRequest != null;
    }

    public int getDrainTimeout() {
        return this.session.getProvider().getDrainTimeout();
    }

    public String toString() {
        return "AmqpConsumer { " + ((JmsConsumerInfo)this.getResourceInfo()).getId() + " }";
    }

    protected void deliveryFailedUndeliverable(Delivery incoming) {
        incoming.disposition((DeliveryState)AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE);
        incoming.settle();
        this.sendFlowIfNeeded();
    }

    protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
        if (!this.deferredClose) {
            ProviderListener listener = this.session.getProvider().getProviderListener();
            if (listener != null) {
                LOG.debug("Dispatching received message: {}", (Object)envelope);
                listener.onInboundMessage(envelope);
            } else {
                LOG.error("Provider listener is not set, message will be dropped: {}", (Object)envelope);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
        int count;
        if (this.incomingBuffer.capacity() < incoming.available()) {
            this.incomingBuffer.capacity(incoming.available());
        }
        while ((count = ((Receiver)this.getEndpoint()).recv(this.incomingBuffer.array(), this.incomingBuffer.writerIndex(), this.incomingBuffer.writableBytes())) > 0) {
            this.incomingBuffer.writerIndex(this.incomingBuffer.writerIndex() + count);
            if (this.incomingBuffer.isWritable()) continue;
            this.incomingBuffer.capacity((int)((double)this.incomingBuffer.capacity() * 1.5));
        }
        try {
            ByteBuf byteBuf = this.incomingBuffer.duplicate();
            return byteBuf;
        }
        finally {
            this.incomingBuffer.clear();
        }
    }

    public void preCommit() {
    }

    public void preRollback() {
    }

    public void postCommit() {
        this.tryCompleteDeferredClose();
    }

    public void postRollback() {
        this.releasePrefetch();
        this.tryCompleteDeferredClose();
    }

    @Override
    public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
        AmqpConnection connection = this.session.getConnection();
        AmqpSubscriptionTracker subTracker = connection.getSubTracker();
        JmsConsumerInfo consumerInfo = (JmsConsumerInfo)this.getResourceInfo();
        subTracker.consumerRemoved(consumerInfo);
        if (this.stopRequest != null) {
            if (cause == null) {
                this.stopRequest.onSuccess();
            } else {
                this.stopRequest.onFailure(cause);
            }
            this.stopRequest = null;
        }
        if (this.pullRequest != null) {
            if (cause == null) {
                this.pullRequest.onSuccess();
            } else {
                this.pullRequest.onFailure(cause);
            }
            this.pullRequest = null;
        }
    }

    private boolean shouldDeferClose() {
        if (this.getSession().isTransacted() && this.getSession().getTransactionContext().isInTransaction(this.getConsumerId())) {
            return true;
        }
        return this.deliveredCount > 0L;
    }

    private void tryCompleteDeferredClose() {
        if (this.deferredClose && this.deliveredCount == 0L) {
            super.close(new DeferredCloseRequest());
        }
    }

    private void releasePrefetch() {
        for (Delivery delivery = ((Receiver)this.getEndpoint()).head(); delivery != null; delivery = delivery.next()) {
            Delivery current = delivery;
            if (current.getContext() instanceof JmsInboundMessageDispatch) continue;
            LOG.debug("{} Found incomplete delivery with no context during release processing", (Object)this);
        }
    }

    private static final class ScheduledRequest
    implements AsyncResult {
        private final ScheduledFuture<?> sheduledTask;
        private final AsyncResult origRequest;

        public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) {
            this.sheduledTask = completionTask;
            this.origRequest = origRequest;
        }

        @Override
        public void onFailure(Throwable cause) {
            this.sheduledTask.cancel(false);
            this.origRequest.onFailure(cause);
        }

        @Override
        public void onSuccess() {
            boolean cancelled = this.sheduledTask.cancel(false);
            if (cancelled) {
                this.origRequest.onSuccess();
            }
        }

        @Override
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }

    private final class DeferredCloseRequest
    implements AsyncResult {
        private DeferredCloseRequest() {
        }

        @Override
        public void onFailure(Throwable result) {
            LOG.trace("Failed deferred close of consumer: {} - {}", (Object)AmqpConsumer.this.getConsumerId(), (Object)result.getMessage());
            AmqpConsumer.this.getParent().getProvider().fireNonFatalProviderException((Exception)((Object)JmsExceptionSupport.create(result)));
        }

        @Override
        public void onSuccess() {
            LOG.trace("Completed deferred close of consumer: {}", (Object)AmqpConsumer.this.getConsumerId());
        }

        @Override
        public boolean isComplete() {
            return AmqpConsumer.this.isClosed();
        }
    }

    private final class StopAndReleaseRequest
    extends WrappedAsyncResult {
        public StopAndReleaseRequest(AsyncResult closeRequest) {
            super(closeRequest);
        }

        @Override
        public void onSuccess() {
            AmqpConsumer.this.releasePrefetch();
            super.onSuccess();
        }
    }
}

