/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.io.jms.JMSBase;
import com.datatorrent.lib.io.jms.JMSBaseTransactionableStore;
import com.datatorrent.lib.io.jms.JMSTransactionableStore;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractJMSOutputOperator
extends JMSBase
implements Operator {
    private static final Logger logger = LoggerFactory.getLogger(AbstractJMSOutputOperator.class);
    private List<Object> tupleBatch = Lists.newArrayList();
    private List<Message> messageBatch = Lists.newArrayList();
    private transient String appId;
    private transient int operatorId;
    private transient long committedWindowId;
    private long currentWindowId;
    private Operator.ProcessingMode mode;
    private transient MessageProducer producer;
    protected JMSBaseTransactionableStore store = new JMSTransactionableStore();

    public void setup(Context.OperatorContext context) {
        this.appId = (String)context.getValue(DAG.APPLICATION_ID);
        this.operatorId = context.getId();
        logger.debug("Application Id {} operatorId {}", (Object)this.appId, (Object)this.operatorId);
        this.store.setBase(this);
        this.store.setAppId(this.appId);
        this.store.setOperatorId(this.operatorId);
        this.transacted = this.store.isTransactable();
        try {
            this.createConnection();
        }
        catch (JMSException ex) {
            logger.debug(ex.getLocalizedMessage());
            throw new RuntimeException(ex);
        }
        logger.debug("Session is null {}:", (Object)(this.getSession() == null ? 1 : 0));
        try {
            this.store.connect();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        logger.debug("Done connecting store.");
        this.mode = (Operator.ProcessingMode)context.getValue(Context.OperatorContext.PROCESSING_MODE);
        if (this.mode == Operator.ProcessingMode.AT_MOST_ONCE) {
            this.tupleBatch.clear();
        }
        for (Object tempObject : this.tupleBatch) {
            this.messageBatch.add(this.createMessage(tempObject));
        }
        this.committedWindowId = this.store.getCommittedWindowId(this.appId, this.operatorId);
        logger.debug("committedWindowId {}", (Object)this.committedWindowId);
        logger.debug("End of setup store in transaction: {}", (Object)this.store.isInTransaction());
    }

    public void teardown() {
        this.tupleBatch.clear();
        this.messageBatch.clear();
        logger.debug("beginning teardown");
        try {
            this.store.disconnect();
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
        this.cleanup();
        logger.debug("ending teardown");
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        this.store.beginTransaction();
        logger.debug("Transaction started for window {}", (Object)windowId);
    }

    public void endWindow() {
        logger.debug("Ending window {}", (Object)this.currentWindowId);
        if (this.store.isExactlyOnce()) {
            if (this.committedWindowId < this.currentWindowId) {
                this.store.storeCommittedWindowId(this.appId, this.operatorId, this.currentWindowId);
                this.committedWindowId = this.currentWindowId;
            }
            this.flushBatch();
            this.store.commitTransaction();
        } else {
            this.flushBatch();
            this.store.commitTransaction();
            if (this.committedWindowId < this.currentWindowId) {
                this.store.storeCommittedWindowId(this.appId, this.operatorId, this.currentWindowId);
                this.committedWindowId = this.currentWindowId;
            }
        }
        logger.debug("done ending window {}", (Object)this.currentWindowId);
    }

    protected void flushBatch() {
        logger.debug("flushing batch, batch size {}", (Object)this.tupleBatch.size());
        for (Message message : this.messageBatch) {
            try {
                this.producer.send(message);
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        this.tupleBatch.clear();
        this.messageBatch.clear();
        logger.debug("done flushing batch");
    }

    protected void sendMessage(Object data) {
        if (this.currentWindowId <= this.committedWindowId) {
            return;
        }
        this.tupleBatch.add(data);
        Message message = this.createMessage(data);
        this.messageBatch.add(message);
        if (this.tupleBatch.size() >= this.getBatch()) {
            this.flushBatch();
        }
    }

    public void setStore(JMSBaseTransactionableStore store) {
        this.store = store;
    }

    public JMSBaseTransactionableStore getStore() {
        return this.store;
    }

    @Override
    public void cleanup() {
        try {
            this.producer.close();
            this.producer = null;
            super.cleanup();
        }
        catch (JMSException ex) {
            logger.error(null, (Throwable)ex);
        }
    }

    @Override
    public void createConnection() throws JMSException {
        super.createConnection();
        this.producer = this.getSession().createProducer(this.getDestination());
    }

    protected abstract Message createMessage(Object var1);
}

