/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;

public abstract class ConsumerTarget_0_8
extends AbstractConsumerTarget
implements FlowCreditManager.FlowCreditManagerListener {
    private final ClientDeliveryMethod _deliveryMethod;
    private final RecordDeliveryMethod _recordMethod;
    private final AtomicLong _unacknowledgedCount = new AtomicLong(0L);
    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0L);
    private final AtomicBoolean _needToClose = new AtomicBoolean();
    private final String _targetAddress;
    private final AMQChannel _channel;
    private final AMQShortString _consumerTag;
    private final FlowCreditManager _creditManager;
    private final Boolean _autoClose;
    private final AtomicBoolean _deleted = new AtomicBoolean(false);
    private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>(){

        public void stateChanged(MessageInstance entry, MessageInstance.EntryState oldState, MessageInstance.EntryState newState) {
            if (this.isConsumerAcquiredStateForThis(oldState) && !this.isConsumerAcquiredStateForThis(newState)) {
                ConsumerTarget_0_8.this.removeUnacknowledgedMessage(entry);
                entry.removeStateChangeListener((StateChangeListener)this);
            }
        }

        private boolean isConsumerAcquiredStateForThis(MessageInstance.EntryState state) {
            return state instanceof MessageInstance.ConsumerAcquiredState && ((MessageInstance.ConsumerAcquiredState)state).getConsumer().getTarget() == ConsumerTarget_0_8.this;
        }
    };

    public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, boolean multiQueue) {
        return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(), multiQueue);
    }

    public static ConsumerTarget_0_8 createGetNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) {
        return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
    }

    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, boolean multiQueue) {
        return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(), multiQueue);
    }

    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, boolean multiQueue) {
        return new AckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod(), multiQueue);
    }

    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) {
        return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
    }

    public ConsumerTarget_0_8(AMQChannel channel, AMQShortString consumerTag, FieldTable arguments, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod, boolean multiQueue) {
        super(ConsumerTarget.State.ACTIVE, ConsumerTarget_0_8.isPullOnly(arguments), multiQueue, (AMQPConnection)channel.getAMQPConnection());
        this._channel = channel;
        this._consumerTag = consumerTag;
        this._creditManager = creditManager;
        creditManager.addStateListener((FlowCreditManager.FlowCreditManagerListener)this);
        this._deliveryMethod = deliveryMethod;
        this._recordMethod = recordMethod;
        if (arguments != null) {
            Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
            this._autoClose = autoClose != null ? (Boolean)autoClose : Boolean.valueOf(false);
            this._targetAddress = arguments.containsKey("local-address") ? String.valueOf(arguments.get("local-address")) : consumerTag.toString();
        } else {
            this._autoClose = false;
            this._targetAddress = consumerTag.toString();
        }
    }

    private static boolean isPullOnly(FieldTable arguments) {
        return arguments != null && arguments.containsKey("x-pull-only") && Boolean.valueOf(String.valueOf(arguments.get("x-pull-only"))) != false;
    }

    public String getTargetAddress() {
        return this._targetAddress;
    }

    public AMQSessionModel getSessionModel() {
        return this._channel;
    }

    public String toString() {
        return "ConsumerTarget_0_8[channel=" + this._channel + ", consumerTag=" + this._consumerTag + ", session=" + this.getConnection().getRemoteAddressString() + "]";
    }

    public boolean isFlowSuspended() {
        return this.getState() != ConsumerTarget.State.ACTIVE || this._channel.isSuspended() || this._deleted.get() || this._channel.getAMQPConnection().isConnectionStopped();
    }

    public void queueDeleted() {
        this._deleted.set(true);
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    protected void doCloseInternal() {
        this._creditManager.removeListener((FlowCreditManager.FlowCreditManagerListener)this);
    }

    public boolean allocateCredit(ServerMessage msg) {
        return this._creditManager.useCreditForMessage(msg.getSize());
    }

    public AMQChannel getChannel() {
        return this._channel;
    }

    public AMQShortString getConsumerTag() {
        return this._consumerTag;
    }

    private AMQPConnection_0_8 getConnection() {
        return this._channel.getConnection();
    }

    public void restoreCredit(ServerMessage message) {
        this._creditManager.restoreCredit(1L, message.getSize());
    }

    public void creditStateChanged(boolean hasCredit) {
        if (hasCredit) {
            if (!this.updateState(ConsumerTarget.State.SUSPENDED, ConsumerTarget.State.ACTIVE)) {
                this.notifyCurrentState();
            }
        } else {
            this.updateState(ConsumerTarget.State.ACTIVE, ConsumerTarget.State.SUSPENDED);
        }
    }

    protected long sendToClient(ConsumerImpl consumer, ServerMessage message, InstanceProperties props, long deliveryTag) {
        return this._deliveryMethod.deliverToClient(consumer, message, props, deliveryTag);
    }

    protected void recordMessageDelivery(ConsumerImpl consumer, MessageInstance entry, long deliveryTag) {
        this._recordMethod.recordMessageDelivery(consumer, entry, deliveryTag);
    }

    public void confirmAutoClose() {
        ProtocolOutputConverter converter = this.getChannel().getConnection().getProtocolOutputConverter();
        converter.confirmConsumerAutoClose(this.getChannel().getChannelId(), this.getConsumerTag());
    }

    public void queueEmpty() {
        if (this.isAutoClose()) {
            this._needToClose.set(true);
            this.getChannel().getConnection().notifyWork();
        }
    }

    protected void processClosed() {
        if (this.hasClosed()) {
            this.close();
            this.confirmAutoClose();
        }
    }

    protected void processStateChanged() {
    }

    protected boolean hasStateChanged() {
        return false;
    }

    protected boolean hasClosed() {
        return this._needToClose.get() && this.getState() != ConsumerTarget.State.CLOSED;
    }

    public void flushBatched() {
        this._channel.getConnection().setDeferFlush(false);
    }

    protected void addUnacknowledgedMessage(MessageInstance entry) {
        long size = entry.getMessage().getSize();
        this._unacknowledgedBytes.addAndGet(size);
        this._unacknowledgedCount.incrementAndGet();
        entry.addStateChangeListener(this._unacknowledgedMessageListener);
    }

    private void removeUnacknowledgedMessage(MessageInstance entry) {
        long _size = entry.getMessage().getSize();
        this._unacknowledgedBytes.addAndGet(-_size);
        this._unacknowledgedCount.decrementAndGet();
        this._creditManager.restoreCredit(1L, _size);
    }

    public void acquisitionRemoved(MessageInstance node) {
    }

    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    static final class AckConsumer
    extends ConsumerTarget_0_8 {
        public AckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod, boolean multiQueue) {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void doSend(ConsumerImpl consumer, MessageInstance entry, boolean batch) {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getConnection().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.addUnacknowledgedMessage(entry);
                this.recordMessageDelivery(consumer, entry, deliveryTag);
                long size = this.sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
                entry.incrementDeliveryCount();
            }
        }
    }

    public static final class GetNoAckConsumer
    extends NoAckConsumer {
        public GetNoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, false);
        }
    }

    public static class NoAckConsumer
    extends ConsumerTarget_0_8 {
        private final AutoCommitTransaction _txn;
        private static final ServerTransaction.Action NOOP = new ServerTransaction.Action(){

            public void postCommit() {
            }

            public void onRollback() {
            }
        };

        public NoAckConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod, boolean multiQueue) {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
            this._txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void doSend(ConsumerImpl consumer, MessageInstance entry, boolean batch) {
            this._txn.dequeue(entry.getEnqueueRecord(), NOOP);
            ServerMessage message = entry.getMessage();
            MessageReference ref = message.newReference();
            InstanceProperties props = entry.getInstanceProperties();
            entry.delete();
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getConnection().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                long size = this.sendToClient(consumer, message, props, deliveryTag);
            }
            ref.release();
        }
    }

    static final class BrowserConsumer
    extends ConsumerTarget_0_8 {
        public BrowserConsumer(AMQChannel channel, AMQShortString consumerTag, FieldTable filters, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod, boolean multiQueue) {
            super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod, multiQueue);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void doSend(ConsumerImpl consumer, MessageInstance entry, boolean batch) {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
            }
        }
    }
}

