/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.jms;

import com.datatorrent.lib.io.jms.JMSBaseTransactionableStore;
import java.io.IOException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class JMSTransactionableStore
extends JMSBaseTransactionableStore {
    private static final Logger logger = LoggerFactory.getLogger(JMSTransactionableStore.class);
    private transient MessageProducer producer;
    private transient MessageConsumer consumer;
    private transient boolean connected = false;
    private transient boolean inTransaction = false;

    @Override
    public long getCommittedWindowId(String appId, int operatorId) {
        logger.debug("Getting committed windowId appId {} operatorId {}", (Object)appId, (Object)operatorId);
        try {
            this.beginTransaction();
            BytesMessage message = (BytesMessage)this.consumer.receive();
            logger.debug("Retrieved committed window message id {}", (Object)message.getJMSMessageID());
            long windowId = message.readLong();
            message = this.getBase().getSession().createBytesMessage();
            message.writeLong(windowId);
            this.producer.send((Message)message);
            this.commitTransaction();
            logger.debug("Retrieved windowId {}", (Object)windowId);
            return windowId;
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void storeCommittedWindowId(String appId, int operatorId, long windowId) {
        if (!this.inTransaction) {
            throw new RuntimeException("This should be called while you are in an existing transaction");
        }
        logger.debug("storing window appId {} operatorId {} windowId {}", new Object[]{appId, operatorId, windowId});
        try {
            this.removeCommittedWindowId(appId, operatorId);
            BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage();
            bytesMessage.writeLong(windowId);
            this.producer.send((Message)bytesMessage);
            logger.debug("Retrieved committed window message id {}", (Object)bytesMessage.getJMSMessageID());
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void removeCommittedWindowId(String appId, int operatorId) {
        try {
            this.consumer.receive();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void beginTransaction() {
        logger.debug("beginning transaction");
        if (this.inTransaction) {
            throw new RuntimeException("Cannot start a transaction twice.");
        }
        this.inTransaction = true;
    }

    @Override
    public void commitTransaction() {
        logger.debug("committing transaction.");
        if (!this.inTransaction) {
            throw new RuntimeException("Cannot commit a transaction if you are not in one.");
        }
        try {
            this.getBase().getSession().commit();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
        this.inTransaction = false;
        logger.debug("finished committing transaction.");
    }

    @Override
    public void rollbackTransaction() {
        try {
            this.getBase().getSession().rollback();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override
    public void connect() throws IOException {
        logger.debug("Entering connect. is in transaction: {}", (Object)this.inTransaction);
        try {
            boolean hasStore;
            String queueName = this.getQueueName(this.getAppId(), this.getOperatorId());
            logger.debug("Base is null: {}", (Object)(this.getBase() == null ? 1 : 0));
            if (this.getBase() != null) {
                logger.debug("Session is null: {}", (Object)(this.getBase().getSession() == null ? 1 : 0));
            }
            Queue queue = this.getBase().getSession().createQueue(queueName);
            QueueBrowser browser = this.getBase().getSession().createBrowser(queue);
            try {
                Enumeration enumeration = browser.getEnumeration();
                hasStore = enumeration.hasMoreElements();
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
            this.producer = this.getBase().getSession().createProducer((Destination)queue);
            this.consumer = this.getBase().getSession().createConsumer((Destination)queue);
            this.connected = true;
            logger.debug("Connected. is in transaction: {}", (Object)this.inTransaction);
            if (!hasStore) {
                this.beginTransaction();
                BytesMessage message = this.getBase().getSession().createBytesMessage();
                message.writeLong(-1L);
                this.producer.send((Message)message);
                this.commitTransaction();
            }
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
        logger.debug("Exiting connect. is in transaction: {}", (Object)this.inTransaction);
    }

    @Override
    public void disconnect() throws IOException {
        logger.debug("disconnectiong");
        try {
            this.producer.close();
            this.consumer.close();
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
        this.inTransaction = false;
        this.connected = false;
        logger.debug("done disconnectiong");
    }

    @Override
    public boolean isConnected() {
        return this.connected;
    }

    private String getQueueName(String appId, int operatorId) {
        return appId + "-" + operatorId;
    }
}

