/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.jms.JMSBase;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow=false)
public abstract class AbstractJMSInputOperator<T>
extends JMSBase
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
MessageListener,
ExceptionListener,
Operator.IdleTimeHandler,
Operator.CheckpointListener {
    protected static final int DEFAULT_BUFFER_SIZE = 10240;
    @Min(value=1L)
    protected int bufferSize = 10240;
    private String consumerName;
    protected transient ArrayBlockingQueue<Message> holdingBuffer;
    protected final transient Map<String, T> currentWindowRecoveryState;
    protected transient Message lastMsg;
    private transient MessageProducer replyProducer;
    private transient MessageConsumer consumer;
    @NotNull
    private final BasicCounters<MutableLong> counters;
    private transient Context.OperatorContext context;
    private transient long spinMillis;
    private final transient AtomicReference<Throwable> throwable;
    @NotNull
    protected WindowDataManager windowDataManager;
    private transient long[] operatorRecoveredWindows;
    protected transient long currentWindowId;
    protected transient int emitCount;
    private final transient Set<String> pendingAck;
    private final transient Lock lock;
    public final transient DefaultOutputPort<T> output = new DefaultOutputPort();
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJMSInputOperator.class);

    public AbstractJMSInputOperator() {
        this.counters = new BasicCounters<MutableLong>(MutableLong.class);
        this.throwable = new AtomicReference();
        this.pendingAck = Sets.newHashSet();
        this.windowDataManager = new FSWindowDataManager();
        this.lock = new Lock();
        this.currentWindowRecoveryState = Maps.newLinkedHashMap();
        this.holdingBuffer = new ArrayBlockingQueue<Message>(this.bufferSize){
            private static final long serialVersionUID = 201411151139L;

            @Override
            public boolean add(Message message) {
                Lock lock = AbstractJMSInputOperator.this.lock;
                synchronized (lock) {
                    try {
                        return AbstractJMSInputOperator.this.messageConsumed(message) && super.add(message);
                    }
                    catch (JMSException e) {
                        LOG.error("message consumption", (Throwable)e);
                        AbstractJMSInputOperator.this.throwable.set(e);
                        throw new RuntimeException(e);
                    }
                }
            }
        };
    }

    public final void onMessage(Message message) {
        this.holdingBuffer.add(message);
        this.sendReply(message);
    }

    protected void sendReply(Message message) {
        try {
            if (message.getJMSReplyTo() != null) {
                this.replyProducer.send(message.getJMSReplyTo(), (Message)this.getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
            }
        }
        catch (JMSException ex) {
            LOG.error(ex.getLocalizedMessage());
            this.throwable.set(ex);
            throw new RuntimeException(ex);
        }
    }

    public void onException(JMSException ex) {
        this.cleanup();
        LOG.error(ex.getLocalizedMessage());
        this.throwable.set(ex);
        throw new RuntimeException(ex);
    }

    public void setup(Context.OperatorContext context) {
        this.context = context;
        this.spinMillis = ((Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        this.counters.setCounter(CounterKeys.RECEIVED, new MutableLong());
        this.counters.setCounter(CounterKeys.REDELIVERED, new MutableLong());
        this.windowDataManager.setup((Context)context);
        try {
            this.operatorRecoveredWindows = this.windowDataManager.getWindowIds(context.getId());
            if (this.operatorRecoveredWindows != null) {
                Arrays.sort(this.operatorRecoveredWindows);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("fetching windows", e);
        }
    }

    protected boolean messageConsumed(Message message) throws JMSException {
        if (message.getJMSRedelivered() && this.pendingAck.contains(message.getJMSMessageID())) {
            this.counters.getCounter(CounterKeys.REDELIVERED).increment();
            LOG.warn("IGNORING: Redelivered Message {}", (Object)message.getJMSMessageID());
            return false;
        }
        this.pendingAck.add(message.getJMSMessageID());
        MutableLong receivedCt = this.counters.getCounter(CounterKeys.RECEIVED);
        receivedCt.increment();
        LOG.debug("message id: {} buffer size: {} received: {}", new Object[]{message.getJMSMessageID(), this.holdingBuffer.size(), receivedCt.longValue()});
        return true;
    }

    public void activate(Context.OperatorContext ctx) {
        try {
            super.createConnection();
            this.replyProducer = this.getSession().createProducer(null);
            this.consumer = this.isDurable() && this.isTopic() ? this.getSession().createDurableSubscriber((Topic)this.getDestination(), this.consumerName) : this.getSession().createConsumer(this.getDestination());
            this.consumer.setMessageListener((MessageListener)this);
        }
        catch (JMSException ex) {
            throw new RuntimeException(ex);
        }
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            this.replay(windowId);
        }
    }

    protected void replay(long windowId) {
        try {
            Map recoveredData = (Map)this.windowDataManager.load(this.context.getId(), windowId);
            if (recoveredData == null) {
                return;
            }
            for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
                this.pendingAck.add((String)recoveredEntry.getKey());
                this.emit(recoveredEntry.getValue());
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void emitTuples() {
        Message msg;
        if (this.currentWindowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            return;
        }
        while (this.emitCount < this.bufferSize && (msg = this.holdingBuffer.poll()) != null) {
            this.processMessage(msg);
            ++this.emitCount;
            this.lastMsg = msg;
        }
    }

    protected void processMessage(Message message) {
        try {
            T payload = this.convert(message);
            if (payload != null) {
                this.currentWindowRecoveryState.put(message.getJMSMessageID(), payload);
                this.emit(payload);
            }
        }
        catch (JMSException e) {
            throw new RuntimeException("processing msg", e);
        }
    }

    public void handleIdleTime() {
        Throwable lthrowable = this.throwable.get();
        if (lthrowable == null) {
            try {
                Thread.sleep(this.spinMillis);
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        } else {
            DTThrowable.rethrow((Throwable)lthrowable);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestRecoveryWindow()) {
            Lock lock = this.lock;
            synchronized (lock) {
                boolean stateSaved = false;
                boolean ackCompleted = false;
                try {
                    Message msg;
                    while ((msg = this.holdingBuffer.poll()) != null) {
                        this.processMessage(msg);
                        ++this.emitCount;
                        this.lastMsg = msg;
                    }
                    this.windowDataManager.save(this.currentWindowRecoveryState, this.context.getId(), this.currentWindowId);
                    stateSaved = true;
                    this.currentWindowRecoveryState.clear();
                    if (this.lastMsg != null) {
                        this.acknowledge();
                    }
                    ackCompleted = true;
                    this.pendingAck.clear();
                }
                catch (Throwable t) {
                    if (!ackCompleted) {
                        LOG.info("confirm recovery of {} for {} does not exist", new Object[]{this.context.getId(), this.currentWindowId, t});
                    }
                    DTThrowable.rethrow((Throwable)t);
                }
                finally {
                    if (stateSaved && !ackCompleted) {
                        try {
                            this.windowDataManager.delete(this.context.getId(), this.currentWindowId);
                        }
                        catch (IOException e) {
                            LOG.error("unable to delete corrupted state", (Throwable)e);
                        }
                    }
                }
            }
            this.emitCount = 0;
        } else if (this.operatorRecoveredWindows != null && this.currentWindowId < this.operatorRecoveredWindows[this.operatorRecoveredWindows.length - 1]) {
            this.pendingAck.clear();
        }
        this.context.setCounters(this.counters);
    }

    protected void acknowledge() throws JMSException {
        if (this.isTransacted()) {
            this.getSession().commit();
        } else if (this.getSessionAckMode(this.getAckMode()) == 2) {
            this.lastMsg.acknowledge();
        }
    }

    public void checkpointed(long windowId) {
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.deleteUpTo(this.context.getId(), windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public void deactivate() {
        this.cleanup();
    }

    @Override
    protected void cleanup() {
        try {
            this.consumer.setMessageListener(null);
            this.replyProducer.close();
            this.replyProducer = null;
            this.consumer.close();
            this.consumer = null;
            super.cleanup();
        }
        catch (JMSException ex) {
            throw new RuntimeException("at cleanup", ex);
        }
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    protected abstract T convert(Message var1) throws JMSException;

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    public void setConsumerName(String consumerName) {
        this.consumerName = consumerName;
    }

    public void setWindowDataManager(WindowDataManager storageManager) {
        this.windowDataManager = storageManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    protected abstract void emit(T var1);

    private static class Lock {
        private Lock() {
        }
    }

    public static enum CounterKeys {
        RECEIVED,
        REDELIVERED;

    }
}

