/*
 * Decompiled with CFR 0.152.
 */
package com.swiftmq.jms.v500;

import com.swiftmq.client.thread.PoolManager;
import com.swiftmq.jms.ExceptionConverter;
import com.swiftmq.jms.smqp.v500.AsyncMessageDeliveryRequest;
import com.swiftmq.jms.smqp.v500.CloseSessionRequest;
import com.swiftmq.jms.smqp.v500.StartConsumerRequest;
import com.swiftmq.jms.v500.ConnectionImpl;
import com.swiftmq.jms.v500.SessionImpl;
import com.swiftmq.jms.v500.XASessionImpl;
import com.swiftmq.swiftlet.threadpool.AsyncTask;
import com.swiftmq.swiftlet.threadpool.ThreadPool;
import com.swiftmq.tools.queue.SingleProcessorQueue;
import com.swiftmq.tools.requestreply.Reply;
import com.swiftmq.tools.requestreply.Request;
import com.swiftmq.tools.requestreply.RequestRegistry;
import com.swiftmq.tools.requestreply.RequestService;
import javax.jms.ConnectionConsumer;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;

public abstract class ConnectionConsumerImpl
implements ConnectionConsumer,
RequestService {
    public static final String DISPATCH_TOKEN = "sys$jms.client.session.connectionconsumer.queuetask";
    ConnectionImpl myConnection = null;
    int dispatchId = 0;
    int myDispatchId = 0;
    RequestRegistry requestRegistry = null;
    ThreadPool sessionPool = null;
    DeliveryQueue deliveryQueue = null;
    QueueTask queueTask = null;
    ServerSessionPool serverSessionPool;
    int maxMessages = 0;
    ServerSession currentServerSession = null;
    SessionImpl currentSession = null;
    int nCurrent = 0;
    boolean closed = false;

    public ConnectionConsumerImpl(ConnectionImpl myConnection, int dispatchId, RequestRegistry requestRegistry, ServerSessionPool serverSessionPool, int maxMessages) {
        this.myConnection = myConnection;
        this.dispatchId = dispatchId;
        this.requestRegistry = requestRegistry;
        this.serverSessionPool = serverSessionPool;
        this.maxMessages = maxMessages;
        this.sessionPool = PoolManager.getInstance().getSessionPool();
        this.queueTask = new QueueTask();
        this.deliveryQueue = new DeliveryQueue();
    }

    void startConsumer() {
        this.deliveryQueue.startQueue();
    }

    void stopConsumer() {
        this.deliveryQueue.stopQueue();
    }

    protected void fillCache() {
        this.requestRegistry.request(new StartConsumerRequest(this.dispatchId, 0, this.myDispatchId, 0, this.myConnection.getSmqpConsumerCacheSize()));
    }

    protected abstract String getQueueName();

    public void setMyDispatchId(int myDispatchId) {
        this.myDispatchId = myDispatchId;
    }

    @Override
    public void serviceRequest(Request request) {
        this.deliveryQueue.enqueue(request);
    }

    public void processRequest(AsyncMessageDeliveryRequest request, boolean hasNext) {
        try {
            if (this.currentServerSession == null) {
                this.currentServerSession = this.serverSessionPool.getServerSession();
                this.currentSession = this.currentServerSession.getSession() instanceof XASessionImpl ? ((XASessionImpl)this.currentServerSession.getSession()).session : (SessionImpl)this.currentServerSession.getSession();
                this.nCurrent = 0;
                if (!this.currentSession.isShadowConsumerCreated()) {
                    this.currentSession.createShadowConsumer(this.getQueueName());
                }
            }
            this.currentSession.addMessageEntry(request.getMessageEntry());
            ++this.nCurrent;
            if (this.nCurrent == this.maxMessages || !hasNext) {
                this.currentServerSession.start();
                this.currentServerSession = null;
                this.currentSession = null;
                this.nCurrent = 0;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        if (request.isRequiresRestart()) {
            this.fillCache();
        }
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        return this.serverSessionPool;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void close() throws JMSException {
        if (this.closed) {
            return;
        }
        Reply reply = null;
        try {
            reply = this.requestRegistry.request(new CloseSessionRequest(this.dispatchId));
        }
        catch (Exception e) {
            throw ExceptionConverter.convert(e);
        }
        this.deliveryQueue.stopQueue();
        this.deliveryQueue.close();
        this.myConnection.removeRequestService(this.myDispatchId);
        this.myConnection.removeConnectionConsumer(this);
        if (!reply.isOk()) {
            throw ExceptionConverter.convert(reply.getException());
        }
    }

    void cancel() {
        this.closed = true;
        this.deliveryQueue.stopQueue();
        this.deliveryQueue.close();
    }

    private class QueueTask
    implements AsyncTask {
        private QueueTask() {
        }

        @Override
        public boolean isValid() {
            return !ConnectionConsumerImpl.this.closed;
        }

        @Override
        public String getDispatchToken() {
            return ConnectionConsumerImpl.DISPATCH_TOKEN;
        }

        @Override
        public String getDescription() {
            return ConnectionConsumerImpl.this.myConnection.myHostname + "/ConnectionConsumer/QueueTask";
        }

        @Override
        public void run() {
            if (!ConnectionConsumerImpl.this.closed && ConnectionConsumerImpl.this.deliveryQueue.dequeue()) {
                ConnectionConsumerImpl.this.sessionPool.dispatchTask(this);
            }
        }

        @Override
        public void stop() {
        }
    }

    private class DeliveryQueue
    extends SingleProcessorQueue {
        public DeliveryQueue() {
            super(ConnectionConsumerImpl.this.myConnection.smqpConsumerCacheSize);
        }

        @Override
        protected void startProcessor() {
            if (!ConnectionConsumerImpl.this.closed) {
                ConnectionConsumerImpl.this.sessionPool.dispatchTask(ConnectionConsumerImpl.this.queueTask);
            }
        }

        @Override
        protected void process(Object[] bulk, int n) {
            for (int i = 0; i < n; ++i) {
                ConnectionConsumerImpl.this.processRequest((AsyncMessageDeliveryRequest)bulk[i], i + 1 < n);
            }
        }
    }
}

