/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.protocol.v0_8;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class SubscriptionImpl
implements Subscription,
FlowCreditManager.FlowCreditManagerListener {
    private Subscription.StateListener _stateListener = new Subscription.StateListener(){

        public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState) {
        }
    };
    private final AtomicReference<Subscription.State> _state = new AtomicReference<Subscription.State>(Subscription.State.ACTIVE);
    private volatile AMQQueue.Context _queueContext;
    private final ClientDeliveryMethod _deliveryMethod;
    private final RecordDeliveryMethod _recordMethod;
    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState((Subscription)this);
    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
    private final Lock _stateChangeLock;
    private final long _subscriptionID;
    private LogSubject _logSubject;
    private LogActor _logActor;
    private final AtomicLong _deliveredCount = new AtomicLong(0L);
    private final AtomicLong _deliveredBytes = new AtomicLong(0L);
    private final AtomicLong _unacknowledgedCount = new AtomicLong(0L);
    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0L);
    private long _createTime = System.currentTimeMillis();
    private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
    private final AMQChannel _channel;
    private final AMQShortString _consumerTag;
    private boolean _noLocal;
    private final FlowCreditManager _creditManager;
    private FilterManager _filters;
    private final Boolean _autoClose;
    private AMQQueue _queue;
    private final AtomicBoolean _deleted = new AtomicBoolean(false);

    public SubscriptionImpl(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable arguments, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
        Object autoClose;
        this._subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
        this._channel = channel;
        this._consumerTag = consumerTag;
        this._creditManager = creditManager;
        creditManager.addStateListener((FlowCreditManager.FlowCreditManagerListener)this);
        this._noLocal = noLocal;
        this._filters = FilterManagerFactory.createManager((FieldTable)arguments);
        this._deliveryMethod = deliveryMethod;
        this._recordMethod = recordMethod;
        this._stateChangeLock = new ReentrantLock();
        this._autoClose = arguments != null ? ((autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue())) != null ? (Boolean)autoClose : Boolean.valueOf(false)) : Boolean.valueOf(false);
    }

    public AMQSessionModel getSessionModel() {
        return this._channel;
    }

    public Long getDelivered() {
        return this._deliveredCount.get();
    }

    public synchronized void setQueue(AMQQueue queue, boolean exclusive) {
        if (this.getQueue() != null) {
            throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + this.getQueue());
        }
        this._queue = queue;
        this._logSubject = new SubscriptionLogSubject((Subscription)this);
        this._logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), (Subscription)this);
        if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(CurrentActor.get(), this._logSubject, "qpid.message.subscription.create")) {
            String filterLogString = null;
            if (this._filters != null && this._filters.hasFilters()) {
                filterLogString = this._filters.toString();
            }
            if (this.isAutoClose()) {
                filterLogString = filterLogString == null ? "" : filterLogString + ",";
                filterLogString = filterLogString + "AutoClose";
            }
            if (this.isBrowser()) {
                filterLogString = filterLogString + ",Browser";
            }
            CurrentActor.get().message(this._logSubject, SubscriptionMessages.CREATE((String)filterLogString, (queue.isDurable() && exclusive ? 1 : 0) != 0, (filterLogString != null ? 1 : 0) != 0));
        }
    }

    public String toString() {
        String subscriber = "[channel=" + this._channel + ", consumerTag=" + this._consumerTag + ", session=" + this.getProtocolSession().getKey();
        return subscriber + "]";
    }

    public abstract void send(QueueEntry var1, boolean var2) throws AMQException;

    public boolean isSuspended() {
        return !this.isActive() || this._channel.isSuspended() || this._deleted.get() || this._channel.getConnectionModel().isStopped();
    }

    public void queueDeleted(AMQQueue queue) {
        this._deleted.set(true);
    }

    public boolean hasInterest(QueueEntry entry) {
        if (entry.isRejectedBy(this.getSubscriptionID()) && _logger.isDebugEnabled()) {
            _logger.debug((Object)("Subscription:" + this + " rejected message:" + entry));
        }
        if (entry.getMessage() instanceof AMQMessage) {
            if (this._noLocal) {
                AMQMessage message = (AMQMessage)entry.getMessage();
                Object publisherReference = message.getConnectionIdentifier();
                Object localReference = this.getProtocolSession().getReference();
                if (publisherReference != null && publisherReference.equals(localReference)) {
                    return false;
                }
            }
        } else if (MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class) == null) {
            return false;
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug((Object)("(" + this + ") checking filters for message (" + entry));
        }
        return this.checkFilters(entry);
    }

    private boolean checkFilters(QueueEntry msg) {
        return this._filters == null || this._filters.allAllow((Filterable)msg);
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public FlowCreditManager getCreditManager() {
        return this._creditManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        boolean closed = false;
        Subscription.State state = this.getState();
        this._stateChangeLock.lock();
        try {
            while (!closed && state != Subscription.State.CLOSED) {
                closed = this._state.compareAndSet(state, Subscription.State.CLOSED);
                if (!closed) {
                    state = this.getState();
                    continue;
                }
                this._stateListener.stateChange((Subscription)this, state, Subscription.State.CLOSED);
            }
            this._creditManager.removeListener((FlowCreditManager.FlowCreditManagerListener)this);
        }
        finally {
            this._stateChangeLock.unlock();
        }
        CurrentActor.get().message(this._logSubject, SubscriptionMessages.CLOSE());
    }

    public boolean isClosed() {
        return this.getState() == Subscription.State.CLOSED;
    }

    public boolean wouldSuspend(QueueEntry msg) {
        return !this._creditManager.useCreditForMessage(msg.getMessage().getSize());
    }

    public boolean trySendLock() {
        return this._stateChangeLock.tryLock();
    }

    public void getSendLock() {
        this._stateChangeLock.lock();
    }

    public void releaseSendLock() {
        this._stateChangeLock.unlock();
    }

    public AMQChannel getChannel() {
        return this._channel;
    }

    public AMQShortString getConsumerTag() {
        return this._consumerTag;
    }

    public String getConsumerName() {
        return this._consumerTag == null ? null : this._consumerTag.asString();
    }

    public long getSubscriptionID() {
        return this._subscriptionID;
    }

    public AMQProtocolSession getProtocolSession() {
        return this._channel.getProtocolSession();
    }

    public LogActor getLogActor() {
        return this._logActor;
    }

    public AMQQueue getQueue() {
        return this._queue;
    }

    public void onDequeue(QueueEntry queueEntry) {
        this.restoreCredit(queueEntry);
    }

    public void releaseQueueEntry(QueueEntry queueEntry) {
        this.restoreCredit(queueEntry);
    }

    public void restoreCredit(QueueEntry queueEntry) {
        this._creditManager.restoreCredit(1L, queueEntry.getSize());
    }

    public void creditStateChanged(boolean hasCredit) {
        if (hasCredit) {
            if (this._state.compareAndSet(Subscription.State.SUSPENDED, Subscription.State.ACTIVE)) {
                this._stateListener.stateChange((Subscription)this, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
            } else {
                this._stateListener.stateChange((Subscription)this, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
            }
        } else if (this._state.compareAndSet(Subscription.State.ACTIVE, Subscription.State.SUSPENDED)) {
            this._stateListener.stateChange((Subscription)this, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
        }
        CurrentActor.get().message(this._logSubject, SubscriptionMessages.STATE((String)this._state.get().toString()));
    }

    public Subscription.State getState() {
        return this._state.get();
    }

    public void setStateListener(Subscription.StateListener listener) {
        this._stateListener = listener;
    }

    public AMQQueue.Context getQueueContext() {
        return this._queueContext;
    }

    public void setQueueContext(AMQQueue.Context context) {
        this._queueContext = context;
    }

    protected void sendToClient(QueueEntry entry, long deliveryTag) throws AMQException {
        this._deliveryMethod.deliverToClient((Subscription)this, entry, deliveryTag);
        this._deliveredCount.incrementAndGet();
        this._deliveredBytes.addAndGet(entry.getSize());
    }

    protected void recordMessageDelivery(QueueEntry entry, long deliveryTag) {
        this._recordMethod.recordMessageDelivery((Subscription)this, entry, deliveryTag);
    }

    public boolean isActive() {
        return this.getState() == Subscription.State.ACTIVE;
    }

    public QueueEntry.SubscriptionAcquiredState getOwningState() {
        return this._owningState;
    }

    public void confirmAutoClose() {
        ProtocolOutputConverter converter = this.getChannel().getProtocolSession().getProtocolOutputConverter();
        converter.confirmConsumerAutoClose(this.getChannel().getChannelId(), this.getConsumerTag());
    }

    public boolean acquires() {
        return !this.isBrowser();
    }

    public boolean seesRequeues() {
        return !this.isBrowser();
    }

    public boolean isTransient() {
        return false;
    }

    public void set(String key, Object value) {
        this._properties.put(key, value);
    }

    public Object get(String key) {
        return this._properties.get(key);
    }

    public void setNoLocal(boolean noLocal) {
        this._noLocal = noLocal;
    }

    abstract boolean isBrowser();

    public String getCreditMode() {
        return "WINDOW";
    }

    public boolean isBrowsing() {
        return this.isBrowser();
    }

    public boolean isExplicitAcknowledge() {
        return true;
    }

    public boolean isDurable() {
        return false;
    }

    public boolean isExclusive() {
        return this.getQueue().hasExclusiveSubscriber();
    }

    public String getName() {
        return String.valueOf(this._consumerTag);
    }

    public Map<String, Object> getArguments() {
        return null;
    }

    public boolean isSessionTransactional() {
        return this._channel.isTransactional();
    }

    public long getCreateTime() {
        return this._createTime;
    }

    public void queueEmpty() throws AMQException {
        if (this.isAutoClose()) {
            this._queue.unregisterSubscription((Subscription)this);
            this.confirmAutoClose();
        }
    }

    public void flushBatched() {
        this._channel.getProtocolSession().setDeferFlush(false);
        this._channel.getProtocolSession().flushBatched();
    }

    public long getBytesOut() {
        return this._deliveredBytes.longValue();
    }

    public long getMessagesOut() {
        return this._deliveredCount.longValue();
    }

    protected void addUnacknowledgedMessage(QueueEntry entry) {
        final long size = entry.getSize();
        this._unacknowledgedBytes.addAndGet(size);
        this._unacknowledgedCount.incrementAndGet();
        entry.addStateChangeListener(new QueueEntry.StateChangeListener(){

            public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState) {
                if (oldState.equals((Object)QueueEntry.State.ACQUIRED) && !newState.equals((Object)QueueEntry.State.ACQUIRED)) {
                    SubscriptionImpl.this._unacknowledgedBytes.addAndGet(-size);
                    SubscriptionImpl.this._unacknowledgedCount.decrementAndGet();
                    entry.removeStateChangeListener((QueueEntry.StateChangeListener)this);
                }
            }
        });
    }

    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    static final class AckSubscription
    extends SubscriptionImpl {
        public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
        }

        public boolean isBrowser() {
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void send(QueueEntry entry, boolean batch) throws AMQException {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getProtocolSession().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.addUnacknowledgedMessage(entry);
                this.recordMessageDelivery(entry, deliveryTag);
                this.sendToClient(entry, deliveryTag);
            }
        }
    }

    public static final class GetNoAckSubscription
    extends NoAckSubscription {
        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
        }

        public boolean isTransient() {
            return true;
        }

        public boolean wouldSuspend(QueueEntry msg) {
            return !this.getCreditManager().useCreditForMessage(msg.getMessage().getSize());
        }
    }

    public static class NoAckSubscription
    extends SubscriptionImpl {
        private volatile AutoCommitTransaction _txn;
        private static final ServerTransaction.Action NOOP = new ServerTransaction.Action(){

            public void postCommit() {
            }

            public void onRollback() {
            }
        };

        public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
        }

        public boolean isBrowser() {
            return false;
        }

        public boolean isExplicitAcknowledge() {
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void send(QueueEntry entry, boolean batch) throws AMQException {
            if (this._txn == null) {
                this._txn = new AutoCommitTransaction(this.getQueue().getVirtualHost().getMessageStore());
            }
            this._txn.dequeue((BaseQueue)this.getQueue(), (EnqueableMessage)entry.getMessage(), NOOP);
            entry.dequeue();
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                this.getChannel().getProtocolSession().setDeferFlush(batch);
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.sendToClient(entry, deliveryTag);
            }
            entry.dispose();
        }

        public boolean wouldSuspend(QueueEntry msg) {
            return false;
        }
    }

    static final class BrowserSubscription
    extends SubscriptionImpl {
        public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, ClientDeliveryMethod deliveryMethod, RecordDeliveryMethod recordMethod) throws AMQException {
            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
        }

        public boolean isBrowser() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void send(QueueEntry entry, boolean batch) throws AMQException {
            AMQChannel aMQChannel = this.getChannel();
            synchronized (aMQChannel) {
                long deliveryTag = this.getChannel().getNextDeliveryTag();
                this.sendToClient(entry, deliveryTag);
            }
        }

        public boolean wouldSuspend(QueueEntry msg) {
            return false;
        }
    }
}

