/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;

public abstract class ConsumerTarget_0_8
extends AbstractConsumerTarget
implements FlowCreditManager.FlowCreditManagerListener {
    private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener = new StateChangeListener<MessageInstance, MessageInstance.State>(){

        public void stateChanged(MessageInstance entry, MessageInstance.State oldSate, MessageInstance.State newState) {
            if (oldSate == MessageInstance.State.ACQUIRED && newState != MessageInstance.State.ACQUIRED) {
                ConsumerTarget_0_8.this.restoreCredit(entry.getMessage());
            }
            entry.removeStateChangeListener((StateChangeListener)this);
        }
    };
    private final ClientDeliveryMethod _deliveryMethod;
    private final RecordDeliveryMethod _recordMethod;
    private final AtomicLong _unacknowledgedCount = new AtomicLong(0L);
    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0L);
    private ConsumerImpl _consumer;
    private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class);
    private final AMQChannel _channel;
    private final AMQShortString _consumerTag;
    private final FlowCreditManager _creditManager;
    private final Boolean _autoClose;
    private final AtomicBoolean _deleted = new AtomicBoolean(false);

    public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException {
        return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
    }

    public static ConsumerTarget_0_8 createGetNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
        return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
    }

    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException {
        return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
    }

    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
        return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
    }

    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager) throws AMQException {
        return new AckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
    }

    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
        return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
    }

    public ConsumerTarget_0_8(AMQChannel channel, AMQShortString consumerTag, FieldTable arguments, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
        super(ConsumerTarget.State.ACTIVE);
        Object autoClose;
        this._channel = channel;
        this._consumerTag = consumerTag;
        this._creditManager = creditManager;
        creditManager.addStateListener((FlowCreditManager.FlowCreditManagerListener)this);
        this._deliveryMethod = deliveryMethod;
        this._recordMethod = recordMethod;
        this._autoClose = arguments != null ? ((autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue())) != null ? (Boolean)autoClose : Boolean.valueOf(false)) : Boolean.valueOf(false);
    }

    public ConsumerImpl getConsumer() {
        return this._consumer;
    }

    public void consumerRemoved(ConsumerImpl sub) {
    }

    public void consumerAdded(ConsumerImpl sub) {
        this._consumer = sub;
    }

    public AMQSessionModel getSessionModel() {
        return this._channel;
    }

    public String toString() {
        String subscriber = "[channel=" + this._channel + ", consumerTag=" + this._consumerTag + ", session=" + this.getProtocolSession().getKey();
        return subscriber + "]";
    }

    public boolean isSuspended() {
        return this.getState() != ConsumerTarget.State.ACTIVE || this._channel.isSuspended() || this._deleted.get() || this._channel.getConnectionModel().isStopped();
    }

    public void queueDeleted() {
        this._deleted.set(true);
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean close() {
        boolean closed = false;
        ConsumerTarget.State state = this.getState();
        ConsumerImpl consumer = this.getConsumer();
        if (consumer != null) {
            consumer.getSendLock();
        }
        try {
            while (!closed && state != ConsumerTarget.State.CLOSED) {
                closed = this.updateState(state, ConsumerTarget.State.CLOSED);
                if (closed) continue;
                state = this.getState();
            }
            this._creditManager.removeListener((FlowCreditManager.FlowCreditManagerListener)this);
            boolean bl = closed;
            return bl;
        }
        finally {
            if (consumer != null) {
                consumer.releaseSendLock();
            }
        }
    }

    public boolean allocateCredit(ServerMessage msg) {
        return this._creditManager.useCreditForMessage(msg.getSize());
    }

    public AMQChannel getChannel() {
        return this._channel;
    }

    public AMQShortString getConsumerTag() {
        return this._consumerTag;
    }

    public AMQProtocolSession getProtocolSession() {
        return this._channel.getProtocolSession();
    }

    public void restoreCredit(ServerMessage message) {
        this._creditManager.restoreCredit(1L, message.getSize());
    }

    protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener() {
        return this._entryReleaseListener;
    }

    public void creditStateChanged(boolean hasCredit) {
        if (hasCredit) {
            if (!this.updateState(ConsumerTarget.State.SUSPENDED, ConsumerTarget.State.ACTIVE)) {
                this.getStateListener().stateChanged((Object)this, (Enum)ConsumerTarget.State.ACTIVE, (Enum)ConsumerTarget.State.ACTIVE);
            }
        } else {
            this.updateState(ConsumerTarget.State.ACTIVE, ConsumerTarget.State.SUSPENDED);
        }
    }

    protected long sendToClient(ServerMessage message, InstanceProperties props, long deliveryTag) {
        return this._deliveryMethod.deliverToClient(this.getConsumer(), message, props, deliveryTag);
    }

    protected void recordMessageDelivery(MessageInstance entry, long deliveryTag) {
        this._recordMethod.recordMessageDelivery(this.getConsumer(), entry, deliveryTag);
    }

    public void confirmAutoClose() {
        ProtocolOutputConverter converter = this.getChannel().getProtocolSession().getProtocolOutputConverter();
        converter.confirmConsumerAutoClose(this.getChannel().getChannelId(), this.getConsumerTag());
    }

    public void queueEmpty() {
        if (this.isAutoClose()) {
            this.close();
            this.confirmAutoClose();
        }
    }

    public void flushBatched() {
        this._channel.getProtocolSession().setDeferFlush(false);
        this._channel.getProtocolSession().flushBatched();
    }

    protected void addUnacknowledgedMessage(MessageInstance entry) {
        final long size = entry.getMessage().getSize();
        this._unacknowledgedBytes.addAndGet(size);
        this._unacknowledgedCount.incrementAndGet();
        entry.addStateChangeListener((StateChangeListener)new StateChangeListener<MessageInstance, MessageInstance.State>(){

            public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState) {
                if (oldState.equals((Object)MessageInstance.State.ACQUIRED) && !newState.equals((Object)MessageInstance.State.ACQUIRED)) {
                    ConsumerTarget_0_8.this._unacknowledgedBytes.addAndGet(-size);
                    ConsumerTarget_0_8.this._unacknowledgedCount.decrementAndGet();
                    entry.removeStateChangeListener((StateChangeListener)this);
                }
            }
        });
    }

    public void acquisitionRemoved(MessageInstance node) {
    }

    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    static final class AckConsumer
    extends ConsumerTarget_0_8 {
        public AckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long send(MessageInstance entry, boolean batch) {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getProtocolSession().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.addUnacknowledgedMessage(entry);
                this.recordMessageDelivery(entry, deliveryTag);
                entry.addStateChangeListener(this.getReleasedStateChangeListener());
                long size = this.sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
                entry.incrementDeliveryCount();
                return size;
            }
        }
    }

    public static final class GetNoAckConsumer
    extends NoAckConsumer {
        public GetNoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
        }

        @Override
        public boolean allocateCredit(ServerMessage msg) {
            return this.getCreditManager().useCreditForMessage(msg.getSize());
        }
    }

    public static class NoAckConsumer
    extends ConsumerTarget_0_8 {
        private final AutoCommitTransaction _txn;
        private static final ServerTransaction.Action NOOP = new ServerTransaction.Action(){

            public void postCommit() {
            }

            public void onRollback() {
            }
        };

        public NoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
            this._txn = new AutoCommitTransaction(channel.getVirtualHost().getMessageStore());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long send(MessageInstance entry, boolean batch) {
            long size;
            this._txn.dequeue(entry.getOwningResource(), (EnqueueableMessage)entry.getMessage(), NOOP);
            ServerMessage message = entry.getMessage();
            MessageReference ref = message.newReference();
            InstanceProperties props = entry.getInstanceProperties();
            entry.delete();
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getProtocolSession().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                size = this.sendToClient(message, props, deliveryTag);
            }
            ref.release();
            return size;
        }

        @Override
        public boolean allocateCredit(ServerMessage msg) {
            return true;
        }
    }

    static final class BrowserConsumer
    extends ConsumerTarget_0_8 {
        public BrowserConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long send(MessageInstance entry, boolean batch) {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                return this.sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
            }
        }

        @Override
        public boolean allocateCredit(ServerMessage msg) {
            return true;
        }
    }
}

