/*
 * Decompiled with CFR 0.152.
 */
package org.ikasan.framework.initiator.messagedriven.jca;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.ikasan.framework.component.Event;
import org.ikasan.framework.component.IkasanExceptionHandler;
import org.ikasan.framework.event.serialisation.EventDeserialisationException;
import org.ikasan.framework.exception.IkasanExceptionAction;
import org.ikasan.framework.exception.RetryAction;
import org.ikasan.framework.exception.StopAction;
import org.ikasan.framework.flow.Flow;
import org.ikasan.framework.initiator.AbortTransactionException;
import org.ikasan.framework.initiator.AbstractInitiator;
import org.ikasan.framework.initiator.messagedriven.jca.JmsMessageDrivenInitiator;
import org.ikasan.framework.initiator.messagedriven.jca.ListenerSetupFailureListener;
import org.ikasan.framework.initiator.messagedriven.jca.MessageListenerContainer;
import org.ikasan.framework.monitor.MonitorSubject;

public abstract class JmsMessageDrivenInitiatorImpl
extends AbstractInitiator
implements JmsMessageDrivenInitiator,
MonitorSubject,
ListenerSetupFailureListener {
    private int listenerSetupFailureRetryDelay = 10000;
    private int maxListenerSetupFailureRetries = RetryAction.RETRY_INFINITE;
    private static final String INITIATOR_STOPPING = "Initiator cannot process message whilst managing a stop request.";
    private static final String INITIATOR_ANESTHETIST_OPERATING = "Initiator cannot process message until anesthetist has completed.";
    private static final String JMS_MESSAGE_DRIVEN_INITIATOR_TYPE = "JmsMessageDrivenInitiator";
    static Logger logger = Logger.getLogger(JmsMessageDrivenInitiatorImpl.class);
    protected MessageListenerContainer messageListenerContainer;
    protected Anesthetist anesthetist = null;
    protected Halt halt = null;

    public JmsMessageDrivenInitiatorImpl(String moduleName, String name, Flow flow, IkasanExceptionHandler exceptionHandler) {
        super(moduleName, name, flow, exceptionHandler);
    }

    @Override
    public String getType() {
        return JMS_MESSAGE_DRIVEN_INITIATOR_TYPE;
    }

    public void onMessage(Message message) {
        if (this.stopping) {
            throw new AbortTransactionException(INITIATOR_STOPPING);
        }
        if (this.anesthetistOperating()) {
            throw new AbortTransactionException(INITIATOR_ANESTHETIST_OPERATING);
        }
        Event event = null;
        try {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("received message with id [" + message.getJMSMessageID() + "]"));
            }
            if (message instanceof MapMessage) {
                event = this.handleMapMessage((MapMessage)message);
            } else if (message instanceof TextMessage) {
                event = this.handleTextMessage((TextMessage)message);
            } else if (message instanceof ObjectMessage) {
                event = this.handleObjectMessage((ObjectMessage)message);
            } else if (message instanceof StreamMessage) {
                event = this.handleStreamMessage((StreamMessage)message);
            } else if (message instanceof BytesMessage) {
                event = this.handleBytesMessage((BytesMessage)message);
            }
        }
        catch (UnsupportedOperationException unsupportedOperationException) {
            this.logError(null, unsupportedOperationException, this.name, StopAction.instance());
            this.stopInError();
            throw new AbortTransactionException("Exception Action implied rollback");
        }
        catch (EventDeserialisationException eventDeserialisationException) {
            this.logError(null, eventDeserialisationException, this.name, StopAction.instance());
            this.stopInError();
            throw new AbortTransactionException("Exception Action implied rollback");
        }
        catch (Throwable eventSourcingThrowable) {
            IkasanExceptionAction action = this.exceptionHandler.handleThrowable(this.name, eventSourcingThrowable);
            this.logError(null, eventSourcingThrowable, this.name, action);
            this.handleAction(action, null);
        }
        this.invokeFlow(event);
    }

    @Override
    protected void completeRetryCycle() {
        if (this.retryCount != null) {
            this.retryCount = null;
        }
    }

    @Override
    protected void startRetryCycle(Integer maxAttempts, long delay) {
        this.anesthetist = new Anesthetist(delay);
        this.anesthetist.start();
    }

    @Override
    protected void continueRetryCycle(long delay) {
        this.anesthetist = new Anesthetist(delay);
        this.anesthetist.start();
    }

    @Override
    protected void cancelRetryCycle() {
        if (this.anesthetist != null) {
            this.anesthetist.cancel();
            this.anesthetist = null;
        }
        this.retryCount = null;
    }

    @Override
    public boolean isRecovering() {
        return this.retryCount != null;
    }

    @Override
    public boolean isRunning() {
        if (this.halt != null) {
            return false;
        }
        if (this.anesthetistOperating()) {
            return true;
        }
        return this.messageListenerContainer.isRunning();
    }

    protected boolean anesthetistOperating() {
        return this.anesthetist != null && this.anesthetist.isOperating();
    }

    @Override
    protected void startInitiator() {
        this.halt = null;
        this.messageListenerContainer.start();
    }

    @Override
    protected void stopInitiator() {
        this.messageListenerContainer.stop();
    }

    @Override
    protected void stopInError() {
        this.error = true;
        this.stopping = true;
        if (this.isRecovering()) {
            this.cancelRetryCycle();
        }
        this.halt = new Halt();
        this.halt.start();
        this.notifyMonitorListeners();
    }

    public void setMessageListenerContainer(MessageListenerContainer messageListenerContainer) {
        this.messageListenerContainer = messageListenerContainer;
        messageListenerContainer.setListenerSetupExceptionListener(this);
    }

    @Override
    public MessageListenerContainer getMessageListenerContainer() {
        return this.messageListenerContainer;
    }

    @Override
    public void notifyListenerSetupFailure(Throwable throwable) {
        this.handleRetry(this.maxListenerSetupFailureRetries, this.listenerSetupFailureRetryDelay);
    }

    protected Event handleBytesMessage(BytesMessage message) throws JMSException {
        throw new UnsupportedOperationException("This Initiator does not support BytesMessage [" + message.toString() + "]");
    }

    protected Event handleStreamMessage(StreamMessage message) {
        throw new UnsupportedOperationException("This Initiator does not support StreamMessage [" + message.toString() + "]");
    }

    protected Event handleObjectMessage(ObjectMessage message) throws JMSException {
        throw new UnsupportedOperationException("This Initiator does not support ObjectMessage [" + message.toString() + "]");
    }

    protected Event handleMapMessage(MapMessage message) throws JMSException, EventDeserialisationException {
        throw new UnsupportedOperationException("This Initiator does not support MapMessage [" + message.toString() + "]");
    }

    protected Event handleTextMessage(TextMessage message) throws JMSException {
        throw new UnsupportedOperationException("This Initiator does not support TextMessage [" + message.toString() + "]");
    }

    public void setListenerSetupFailureRetryDelay(int listenerSetupFailureRetryDelay) {
        this.listenerSetupFailureRetryDelay = listenerSetupFailureRetryDelay;
    }

    public void setMaxListenerSetupFailureRetries(int maxListenerSetupFailureRetries) {
        this.maxListenerSetupFailureRetries = maxListenerSetupFailureRetries;
    }

    private class Halt
    extends Thread {
        private Halt() {
        }

        @Override
        public void run() {
            logger.info((Object)"stopping messageListenerContainer...");
            JmsMessageDrivenInitiatorImpl.this.messageListenerContainer.stop();
            logger.info((Object)"stopped messageListenerContainer successfully.");
        }
    }

    private class Anesthetist
    extends Thread {
        long sleepPeriod;
        boolean operating = false;
        boolean cancelled = false;

        public Anesthetist(long sleepPeriod) {
            this.sleepPeriod = sleepPeriod;
            logger.info((Object)("Created anesthetist with a sleep time of " + sleepPeriod + "ms"));
        }

        @Override
        public void run() {
            try {
                logger.info((Object)"Anesthetist invoked");
                this.putToSleep();
                logger.info((Object)("Anesthetist sleeping for [" + this.sleepPeriod + "]ms."));
                Anesthetist.sleep(this.sleepPeriod);
                logger.info((Object)"Anesthetist woken from sleep.");
                this.reawaken();
            }
            catch (InterruptedException e) {
                logger.info((Object)"Anesthetist sleep interrupted", (Throwable)e);
                this.reawaken();
            }
        }

        private void putToSleep() {
            this.operating = true;
            logger.info((Object)"Anesthetist invoking the messageListenerConatiner stop...");
            JmsMessageDrivenInitiatorImpl.this.messageListenerContainer.stop();
            logger.info((Object)"Anesthetist invoked the messageListenerConatiner stop successfully.");
        }

        private void reawaken() {
            if (!this.cancelled) {
                logger.info((Object)"Anesthetist restarting messageListenerContainer...");
                JmsMessageDrivenInitiatorImpl.this.messageListenerContainer.start();
                logger.info((Object)"Anesthetist restarted messageListenerContainer successfully.");
            }
            this.operating = false;
        }

        public boolean isOperating() {
            return this.operating;
        }

        public void cancel() {
            logger.info((Object)"cancelling any anesthetist operation");
            this.cancelled = true;
        }
    }
}

