/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v400;

import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.MessageImpl;
import com.swiftmq.jms.SwiftMQMessageConsumer;
import com.swiftmq.jms.smqp.v400.AcknowledgeMessageRequest;
import com.swiftmq.jms.smqp.v400.AsyncMessageDeliveryRequest;
import com.swiftmq.jms.smqp.v400.CloseConsumerRequest;
import com.swiftmq.jms.smqp.v400.MessageDeliveredRequest;
import com.swiftmq.jms.smqp.v400.StartConsumerRequest;
import com.swiftmq.jms.v400.SessionImpl;
import com.swiftmq.swiftlet.queue.MessageEntry;
import com.swiftmq.swiftlet.queue.MessageIndex;
import com.swiftmq.tools.collection.RingBuffer;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.RequestRegistry;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

public class MessageConsumerImpl
implements MessageConsumer,
SwiftMQMessageConsumer {
    boolean closed = false;
    int consumerId = 0;
    boolean transacted = false;
    int acknowledgeMode = 0;
    RequestRegistry requestRegistry = null;
    int dispatchId = 0;
    String messageSelector = null;
    MessageListener messageListener = null;
    SessionImpl mySession = null;
    int serverQueueConsumerId = -1;
    boolean useThreadContextCL = false;
    boolean cancelled = false;
    RingBuffer messageCache = null;
    boolean doAck = false;
    boolean reportDelivered = false;
    boolean receiverWaiting = false;
    boolean wasRecovered = false;
    boolean fillCachePending = false;
    boolean receiveNoWaitFirstCall = true;
    boolean consumerStarted = false;

    public MessageConsumerImpl(boolean transacted, int acknowledgeMode, int dispatchId, RequestRegistry requestRegistry, String messageSelector, SessionImpl session) {
        this.transacted = transacted;
        this.acknowledgeMode = acknowledgeMode;
        this.dispatchId = dispatchId;
        this.requestRegistry = requestRegistry;
        this.messageSelector = messageSelector;
        this.mySession = session;
        this.useThreadContextCL = this.mySession.getMyConnection().isUseThreadContextCL();
        this.reportDelivered = transacted || !transacted && acknowledgeMode == 2;
        this.messageCache = new RingBuffer(this.mySession.getMyConnection().getSmqpConsumerCacheSize());
    }

    protected void verifyState() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Message consumer is closed");
        }
        this.mySession.verifyState();
    }

    void setWasRecovered(boolean wasRecovered) {
        this.wasRecovered = wasRecovered;
    }

    void setDoAck(boolean doAck) {
        this.doAck = doAck;
    }

    synchronized void addToCache(AsyncMessageDeliveryRequest request) {
        this.messageCache.add(request);
    }

    synchronized void addToCache(AsyncMessageDeliveryRequest[] requests, boolean lastRestartRequired) {
        this.fillCachePending = false;
        for (int i = 0; i < requests.length; ++i) {
            if (lastRestartRequired && i == requests.length - 1) {
                requests[i].setRequiresRestart(true);
            }
            this.messageCache.add(requests[i]);
        }
    }

    synchronized boolean invokeConsumer() {
        if (this.messageCache.getSize() > 0) {
            if (this.messageListener == null) {
                if (this.receiverWaiting) {
                    this.receiverWaiting = false;
                    this.notify();
                }
            } else {
                this.invokeMessageListener();
            }
        }
        boolean rc = this.messageCache.getSize() > 0 && (this.messageListener != null || this.receiverWaiting) && !this.isClosed();
        return rc;
    }

    void fillCache() {
        this.fillCachePending = true;
        this.consumerStarted = true;
        this.requestRegistry.request(new StartConsumerRequest(this.dispatchId, this.serverQueueConsumerId, this.mySession.getMyDispatchId(), this.consumerId, this.mySession.getMyConnection().getSmqpConsumerCacheSize()));
    }

    void clearCache() {
        this.fillCachePending = false;
        this.messageCache.clear();
    }

    @Override
    public boolean isClosed() {
        return this.closed || this.mySession.isClosed();
    }

    int getConsumerId() {
        return this.consumerId;
    }

    void setConsumerId(int id) {
        this.consumerId = id;
    }

    int getServerQueueConsumerId() {
        return this.serverQueueConsumerId;
    }

    void setServerQueueConsumerId(int id) {
        this.serverQueueConsumerId = id;
    }

    public String getMessageSelector() throws JMSException {
        this.verifyState();
        return this.messageSelector;
    }

    public synchronized MessageListener getMessageListener() throws JMSException {
        this.verifyState();
        return this.messageListener;
    }

    public synchronized void setMessageListener(MessageListener listener) throws JMSException {
        this.verifyState();
        if (listener != null && !this.consumerStarted) {
            this.fillCache();
        }
        this.messageListener = listener;
        if (listener != null) {
            this.mySession.triggerInvocation();
        }
    }

    private void invokeMessageListener() {
        if (this.isClosed()) {
            return;
        }
        AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
        MessageEntry messageEntry = request.getMessageEntry();
        MessageImpl msg = messageEntry.getMessage();
        messageEntry.moveMessageAttributes();
        MessageIndex msgIndex = msg.getMessageIndex();
        msg.setMessageConsumerImpl(this);
        try {
            msg.reset();
        }
        catch (JMSException e) {
            e.printStackTrace();
        }
        msg.setReadOnly(true);
        msg.setUseThreadContextCL(this.useThreadContextCL);
        if (this.reportDelivered) {
            this.reportDelivered(msg);
        }
        try {
            this.messageListener.onMessage((Message)msg);
        }
        catch (RuntimeException e) {
            System.err.println("ERROR! MessageListener throws RuntimeException, shutting down consumer!");
            e.printStackTrace();
            try {
                this.close(e.toString());
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            return;
        }
        if (!this.wasRecovered) {
            if (request.isRequiresRestart()) {
                this.fillCache();
            }
            if (this.doAck) {
                try {
                    this.acknowledgeMessage(msgIndex, false);
                }
                catch (JMSException jMSException) {}
            }
        } else {
            this.wasRecovered = false;
        }
    }

    private void reportDelivered(Message message) {
        try {
            MessageIndex messageIndex = ((MessageImpl)message).getMessageIndex();
            this.requestRegistry.request(new MessageDeliveredRequest(this.dispatchId, this.serverQueueConsumerId, messageIndex));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public boolean acknowledgeMessage(MessageImpl message) throws JMSException {
        this.acknowledgeMessage(message.getMessageIndex(), true);
        return false;
    }

    private void acknowledgeMessage(MessageIndex messageIndex, boolean replyRequired) throws JMSException {
        if (this.isClosed()) {
            throw new IllegalStateException("Connection is closed");
        }
        Reply reply = null;
        try {
            if (messageIndex == null) {
                throw new JMSException("Unable to acknowledge message - missing message key!");
            }
            reply = this.requestRegistry.request(new AcknowledgeMessageRequest(this.dispatchId, this.serverQueueConsumerId, messageIndex, replyRequired));
        }
        catch (Exception e) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + e);
            }
            throw ExceptionConverter.convert(e);
        }
        if (replyRequired && !reply.isOk()) {
            if (this.isClosed()) {
                throw new IllegalStateException("Connection is closed: " + reply.getException());
            }
            throw ExceptionConverter.convert(reply.getException());
        }
    }

    synchronized Message receiveMessage(boolean block, long timeout) throws JMSException {
        this.verifyState();
        if (this.messageListener != null) {
            throw new JMSException("receive not allowed while a message listener has been set");
        }
        try {
            if (!this.consumerStarted) {
                this.fillCache();
            }
            if (this.messageCache.getSize() == 0) {
                if (block) {
                    this.receiverWaiting = true;
                    if (timeout == 0L) {
                        this.wait();
                    } else {
                        this.wait(timeout);
                    }
                } else if (this.fillCachePending && this.receiveNoWaitFirstCall) {
                    this.wait(1000L);
                }
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.receiverWaiting = false;
        if (this.messageCache.getSize() == 0 || this.isClosed()) {
            return null;
        }
        AsyncMessageDeliveryRequest request = (AsyncMessageDeliveryRequest)this.messageCache.remove();
        MessageEntry messageEntry = request.getMessageEntry();
        MessageImpl msg = messageEntry.getMessage();
        messageEntry.moveMessageAttributes();
        msg.setMessageConsumerImpl(this);
        msg.reset();
        msg.setReadOnly(true);
        if (request.isRequiresRestart()) {
            this.fillCache();
        }
        if (this.reportDelivered) {
            this.reportDelivered(msg);
        }
        if (this.doAck) {
            try {
                this.acknowledgeMessage(msg.getMessageIndex(), false);
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
        }
        return msg;
    }

    public Message receive() throws JMSException {
        return this.receiveMessage(true, 0L);
    }

    public Message receive(long timeOut) throws JMSException {
        return this.receiveMessage(true, timeOut);
    }

    public Message receiveNoWait() throws JMSException {
        Message msg = this.receiveMessage(false, 0L);
        this.receiveNoWaitFirstCall = false;
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(String exception) throws JMSException {
        Object object = this;
        synchronized (object) {
            this.messageCache.clear();
            this.notify();
        }
        if (this.isClosed()) {
            return;
        }
        this.closed = true;
        object = this.mySession;
        synchronized (object) {
            Reply reply = null;
            try {
                reply = this.requestRegistry.request(new CloseConsumerRequest(this.dispatchId, this.dispatchId, this.serverQueueConsumerId, exception));
            }
            catch (Exception e) {
                throw ExceptionConverter.convert(e);
            }
            if (!reply.isOk()) {
                throw ExceptionConverter.convert(reply.getException());
            }
        }
        this.mySession.removeMessageConsumerImpl(this);
    }

    public void close() throws JMSException {
        this.close(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void cancel() {
        this.cancelled = true;
        this.closed = true;
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            this.messageCache.clear();
            this.notify();
        }
    }
}

