/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v600;

import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.MessageImpl;
import com.swiftmq.jms.SwiftMQMessageConsumer;
import com.swiftmq.jms.smqp.v600.AcknowledgeMessageRequest;
import com.swiftmq.jms.smqp.v600.AsyncMessageDeliveryRequest;
import com.swiftmq.jms.smqp.v600.CloseConsumerRequest;
import com.swiftmq.jms.smqp.v600.MessageDeliveredRequest;
import com.swiftmq.jms.smqp.v600.StartConsumerRequest;
import com.swiftmq.jms.v600.CloseConsumer;
import com.swiftmq.jms.v600.Recreatable;
import com.swiftmq.jms.v600.SessionImpl;
import com.swiftmq.swiftlet.queue.MessageEntry;
import com.swiftmq.swiftlet.queue.MessageIndex;
import com.swiftmq.tools.collection.RingBuffer;
import com.swiftmq.tools.concurrent.Semaphore;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.Request;
import com.swiftmq.tools.requestreply.RequestRegistry;
import com.swiftmq.tools.requestreply.RequestRetryValidator;
import com.swiftmq.tools.requestreply.ValidationException;
import com.swiftmq.tools.tracking.MessageTracker;
import com.swiftmq.tools.util.IdGenerator;
import com.swiftmq.tools.util.UninterruptableWaiter;
import java.util.List;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

public class MessageConsumerImpl
implements MessageConsumer,
SwiftMQMessageConsumer,
Recreatable,
RequestRetryValidator {
    String uniqueConsumerId = IdGenerator.getInstance().nextId('/');
    boolean closed = false;
    volatile int consumerId = 0;
    boolean transacted = false;
    int acknowledgeMode = 0;
    RequestRegistry requestRegistry = null;
    String messageSelector = null;
    MessageListener messageListener = null;
    SessionImpl mySession = null;
    int serverQueueConsumerId = -1;
    boolean useThreadContextCL = false;
    boolean cancelled = false;
    RingBuffer messageCache = null;
    boolean doAck = false;
    boolean reportDelivered = false;
    boolean recordLog = true;
    boolean receiverWaiting = false;
    boolean wasRecovered = false;
    boolean fillCachePending = false;
    boolean receiveNoWaitFirstCall = true;
    boolean consumerStarted = false;

    public MessageConsumerImpl(boolean transacted, int acknowledgeMode, RequestRegistry requestRegistry, String messageSelector, SessionImpl session) {
        this.transacted = transacted;
        this.acknowledgeMode = acknowledgeMode;
        this.requestRegistry = requestRegistry;
        this.messageSelector = messageSelector;
        this.mySession = session;
        this.useThreadContextCL = this.mySession.getMyConnection().isUseThreadContextCL();
        this.reportDelivered = transacted || acknowledgeMode == 2;
        this.messageCache = new RingBuffer(this.mySession.getMyConnection().getSmqpConsumerCacheSize());
    }

    @Override
    public Request getRecreateRequest() {
        return null;
    }

    @Override
    public void setRecreateReply(Reply reply) {
    }

    @Override
    public List getRecreatables() {
        return null;
    }

    @Override
    public void validate(Request request) throws ValidationException {
        request.setDispatchId(this.mySession.dispatchId);
        if (request instanceof CloseConsumerRequest) {
            CloseConsumerRequest r = (CloseConsumerRequest)request;
            r.setSessionDispatchId(this.mySession.dispatchId);
            r.setQueueConsumerId(this.serverQueueConsumerId);
        } else {
            request.setCancelledByValidator(true);
        }
    }

    protected void verifyState() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Message consumer is closed");
        }
        this.mySession.verifyState();
    }

    public boolean isConsumerStarted() {
        return this.consumerStarted;
    }

    void setWasRecovered(boolean wasRecovered) {
        this.wasRecovered = wasRecovered;
    }

    void setDoAck(boolean doAck) {
        this.doAck = doAck;
    }

    public void setRecordLog(boolean recordLog) {
        this.recordLog = recordLog;
    }

    synchronized void addToCache(AsyncMessageDeliveryRequest request) {
        if (this.isClosed()) {
            return;
        }
        if (request.isRequiresRestart()) {
            this.fillCachePending = false;
        }
        MessageImpl msg = request.getMessageEntry().getMessage();
        if (request.getConnectionId() != this.mySession.myConnection.getConnectionId()) {
            if (MessageTracker.enabled) {
                MessageTracker.getInstance().track(request.getMessageEntry().getMessage(), new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "addToCache, invalid connectionId (" + request.getConnectionId() + " vs " + this.mySession.myConnection.getConnectionId() + ")");
            }
            return;
        }
        if (MessageTracker.enabled) {
            MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "addToCache");
        }
        this.messageCache.add(request);
    }

    synchronized void addToCache(AsyncMessageDeliveryRequest[] requests, boolean lastRestartRequired) {
        for (int i = 0; i < requests.length; ++i) {
            if (lastRestartRequired && i == requests.length - 1) {
                requests[i].setRequiresRestart(true);
            }
            this.addToCache(requests[i]);
        }
    }

    synchronized boolean invokeConsumer() {
        if (this.messageCache.getSize() > 0) {
            if (this.messageListener == null) {
                if (this.receiverWaiting) {
                    this.receiverWaiting = false;
                    this.notify();
                }
            } else {
                this.invokeMessageListener();
            }
        }
        return this.messageCache.getSize() > 0 && (this.messageListener != null || this.receiverWaiting) && !this.isClosed();
    }

    void fillCache(boolean force) {
        if (this.isClosed() || this.fillCachePending && !force) {
            return;
        }
        this.fillCachePending = true;
        this.consumerStarted = true;
        this.requestRegistry.request(new StartConsumerRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, this.mySession.getMyDispatchId(), this.consumerId, this.mySession.getMyConnection().getSmqpConsumerCacheSize()));
    }

    void fillCache() {
        this.fillCache(false);
    }

    void clearCache() {
        this.fillCachePending = false;
        this.messageCache.clear();
    }

    @Override
    public boolean isClosed() {
        return this.closed || this.mySession.isClosed();
    }

    int getConsumerId() {
        return this.consumerId;
    }

    void setConsumerId(int id) {
        this.consumerId = id;
    }

    int getServerQueueConsumerId() {
        return this.serverQueueConsumerId;
    }

    void setServerQueueConsumerId(int id) {
        this.serverQueueConsumerId = id;
    }

    public String getMessageSelector() throws JMSException {
        this.verifyState();
        return this.messageSelector;
    }

    public synchronized MessageListener getMessageListener() throws JMSException {
        this.verifyState();
        return this.messageListener;
    }

    public synchronized void setMessageListener(MessageListener listener) throws JMSException {
        this.verifyState();
        if (listener != null && !this.consumerStarted) {
            this.fillCache();
        }
        this.messageListener = listener;
        if (listener != null) {
            this.mySession.triggerInvocation();
        }
    }

    private void invokeMessageListener() {
        if (this.isClosed()) {
            return;
        }
        AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
        if (request.getConnectionId() != this.mySession.myConnection.getConnectionId()) {
            if (MessageTracker.enabled) {
                MessageTracker.getInstance().track(request.getMessageEntry().getMessage(), new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, invalid connectionId (" + request.getConnectionId() + " vs " + this.mySession.myConnection.getConnectionId() + ")");
            }
            return;
        }
        MessageEntry messageEntry = request.getMessageEntry();
        MessageImpl msg = messageEntry.getMessage();
        messageEntry.moveMessageAttributes();
        MessageIndex msgIndex = msg.getMessageIndex();
        msg.setMessageConsumerImpl(this);
        try {
            msg.reset();
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
        msg.setReadOnly(true);
        msg.setUseThreadContextCL(this.useThreadContextCL);
        String id = SessionImpl.buildId(this.uniqueConsumerId, msg);
        boolean duplicate = false;
        if (this.recordLog) {
            boolean bl = duplicate = this.mySession.myConnection.isDuplicateMessageDetection() && this.mySession.isDuplicate(id);
        }
        if (MessageTracker.enabled) {
            MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, duplicate=" + duplicate);
        }
        if (this.reportDelivered) {
            this.reportDelivered(msg, false);
        }
        try {
            if (!duplicate) {
                if (this.recordLog && this.mySession.myConnection.isDuplicateMessageDetection()) {
                    this.mySession.addCurrentTxLog(id);
                }
                if (MessageTracker.enabled) {
                    MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, onMessage...");
                }
                this.mySession.withinOnMessage = true;
                this.mySession.setTxCancelled(false);
                this.messageListener.onMessage((Message)msg);
                this.mySession.withinOnMessage = false;
                if (MessageTracker.enabled) {
                    MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, onMessage ok");
                }
                if (this.mySession.isTxCancelled() || this.mySession.acknowledgeMode == 2 && msg.isCancelled()) {
                    if (MessageTracker.enabled) {
                        MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "tx was cancelled, return!");
                    }
                    this.wasRecovered = false;
                    return;
                }
            }
        }
        catch (RuntimeException e) {
            if (MessageTracker.enabled) {
                MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, exception=" + e);
            }
            System.err.println("ERROR! MessageListener throws RuntimeException, shutting down consumer!");
            e.printStackTrace();
            try {
                this.close(e.toString());
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            return;
        }
        if (!this.wasRecovered) {
            if (request.isRequiresRestart()) {
                this.fillCache();
            }
            if (this.doAck) {
                try {
                    if (MessageTracker.enabled) {
                        MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, ack");
                    }
                    boolean cancelled = this.acknowledgeMessage(msgIndex, false);
                    if (MessageTracker.enabled) {
                        MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "invokeMessageListener, ack, cancelled=" + cancelled);
                    }
                }
                catch (JMSException jMSException) {}
            }
        } else {
            this.wasRecovered = false;
        }
    }

    private void reportDelivered(Message message, boolean duplicate) {
        try {
            MessageIndex messageIndex = ((MessageImpl)message).getMessageIndex();
            this.requestRegistry.request(new MessageDeliveredRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, messageIndex, duplicate));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public boolean acknowledgeMessage(MessageImpl message) throws JMSException {
        return this.acknowledgeMessage(message.getMessageIndex(), true);
    }

    private boolean acknowledgeMessage(MessageIndex messageIndex, boolean replyRequired) throws JMSException {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is closed");
        }
        Reply reply = null;
        boolean cancelled = false;
        try {
            if (messageIndex == null) {
                throw new JMSException("Unable to acknowledge message - missing message key!");
            }
            AcknowledgeMessageRequest request = new AcknowledgeMessageRequest(this, this.mySession.dispatchId, this.serverQueueConsumerId, messageIndex);
            request.setReplyRequired(replyRequired);
            reply = this.requestRegistry.request(request);
            if (request.isCancelledByValidator()) {
                cancelled = true;
                this.mySession.addCurrentTxToDuplicateLog();
            }
            this.mySession.removeCurrentTxFromRollbackLog();
            this.mySession.clearCurrentTxLog();
        }
        catch (Exception e) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + e);
            }
            throw ExceptionConverter.convert(e);
        }
        if (replyRequired && !reply.isOk()) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + reply.getException());
            }
            throw ExceptionConverter.convert(reply.getException());
        }
        return cancelled;
    }

    synchronized Message receiveMessage(boolean block, long timeout) throws JMSException {
        this.verifyState();
        if (this.messageListener != null) {
            throw new JMSException("receive not allowed while a message listener has been set");
        }
        boolean wasDuplicate = false;
        boolean wasInvalidConnectionId = false;
        MessageImpl msg = null;
        String id = null;
        long to = timeout;
        do {
            wasDuplicate = false;
            wasInvalidConnectionId = false;
            if (!this.consumerStarted) {
                this.fillCache();
            }
            do {
                if (this.messageCache.getSize() != 0) continue;
                if (block) {
                    this.receiverWaiting = true;
                    if (timeout == 0L) {
                        UninterruptableWaiter.doWait(this);
                        continue;
                    }
                    long startWait = System.currentTimeMillis();
                    UninterruptableWaiter.doWait(this, to);
                    if ((to -= System.currentTimeMillis() - startWait) > 0L) continue;
                    return null;
                }
                if (!this.fillCachePending || !this.receiveNoWaitFirstCall) continue;
                UninterruptableWaiter.doWait(this, 1000L);
            } while (this.mySession.resetInProgress);
            this.receiverWaiting = false;
            if (this.messageCache.getSize() == 0 || this.isClosed()) {
                return null;
            }
            AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
            if (request.getConnectionId() != this.mySession.myConnection.getConnectionId()) {
                if (MessageTracker.enabled) {
                    MessageTracker.getInstance().track(request.getMessageEntry().getMessage(), new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "receiveMessage, invalid connectionId (" + request.getConnectionId() + " vs " + this.mySession.myConnection.getConnectionId() + ")");
                }
                wasInvalidConnectionId = true;
                continue;
            }
            MessageEntry messageEntry = request.getMessageEntry();
            msg = messageEntry.getMessage();
            messageEntry.moveMessageAttributes();
            msg.setMessageConsumerImpl(this);
            msg.reset();
            msg.setReadOnly(true);
            msg.setUseThreadContextCL(this.useThreadContextCL);
            if (request.isRequiresRestart()) {
                this.fillCache();
            }
            id = SessionImpl.buildId(this.uniqueConsumerId, msg);
            if (this.recordLog) {
                boolean bl = wasDuplicate = this.mySession.myConnection.isDuplicateMessageDetection() && this.mySession.isDuplicate(id);
            }
            if (MessageTracker.enabled) {
                MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "receivedMessage, duplicate=" + wasDuplicate);
            }
            if (this.reportDelivered) {
                this.reportDelivered(msg, false);
            }
            if (this.doAck) {
                try {
                    if (MessageTracker.enabled) {
                        MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "receivedMessage, ack...");
                    }
                    boolean cancelled = this.acknowledgeMessage(msg.getMessageIndex(), false);
                    if (MessageTracker.enabled) {
                        MessageTracker.getInstance().track(msg, new String[]{this.mySession.myConnection.toString(), this.mySession.toString(), this.toString()}, "receivedMessage, ack, cancelled=" + cancelled);
                    }
                }
                catch (JMSException jMSException) {
                    // empty catch block
                }
            }
            if (!wasDuplicate) continue;
            msg = null;
        } while (wasDuplicate || wasInvalidConnectionId);
        if (this.recordLog && this.mySession.myConnection.isDuplicateMessageDetection()) {
            this.mySession.addCurrentTxLog(id);
        }
        return msg;
    }

    public Message receive() throws JMSException {
        return this.receiveMessage(true, 0L);
    }

    public Message receive(long timeOut) throws JMSException {
        return this.receiveMessage(true, timeOut);
    }

    public Message receiveNoWait() throws JMSException {
        Message msg = this.receiveMessage(false, 0L);
        this.receiveNoWaitFirstCall = false;
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close(String exception) throws JMSException {
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            if (this.isClosed()) {
                return;
            }
            this.closed = true;
            this.messageCache.clear();
            this.notify();
        }
        Reply reply = null;
        try {
            reply = this.requestRegistry.request(new CloseConsumerRequest(this, this.mySession.dispatchId, this.mySession.dispatchId, this.serverQueueConsumerId, exception));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
        this.mySession.removeMessageConsumerImpl(this);
    }

    public void close() throws JMSException {
        if (this.closed) {
            return;
        }
        if (!this.mySession.isSessionStarted()) {
            this.close(null);
            return;
        }
        CloseConsumer request = new CloseConsumer(this.consumerId);
        request._sem = new Semaphore();
        this.mySession.serviceRequest(request);
        request._sem.waitHere();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cancel() {
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            this.cancelled = true;
            this.closed = true;
            this.messageCache.clear();
            this.notify();
        }
    }
}

