/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal.source;

import com.mulesoft.extension.mq.api.attributes.AnypointMQMessageAttributes;
import com.mulesoft.extension.mq.api.circuit.CircuitBreakerConfiguration;
import com.mulesoft.extension.mq.api.exception.MQAckException;
import com.mulesoft.extension.mq.api.exception.MQDestinationNotFoundException;
import com.mulesoft.extension.mq.api.exception.MQNackException;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.api.source.PrefetchTypeSubscriberFactory;
import com.mulesoft.extension.mq.api.source.SubscriberFactory;
import com.mulesoft.extension.mq.internal.config.SubscriberConfig;
import com.mulesoft.extension.mq.internal.connection.AnypointMQConnection;
import com.mulesoft.extension.mq.internal.error.AnypointMQSubscriberErrorTypeProvider;
import com.mulesoft.extension.mq.internal.server.AnypointMQServer;
import com.mulesoft.extension.mq.internal.source.MQCircuitsManager;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.circuit.impl.NoOpCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.client.mq.domain.MessageIdResult;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.dsl.xml.ParameterDsl;
import org.mule.runtime.extension.api.annotation.error.Throws;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.NullSafe;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="subscriber")
@MediaType(value="*/*", strict=false)
@Throws(value={AnypointMQSubscriberErrorTypeProvider.class})
public class AnypointMQSource
extends Source<InputStream, AnypointMQMessageAttributes> {
    private static final int FIRST_ORDER = 1;
    private static final int SECOND_ORDER = 2;
    private static final int THIRD_ORDER = 3;
    private static final int FOURTH_ORDER = 4;
    private static final int FIFTH_ORDER = 5;
    private static final int SIXTH_ORDER = 6;
    public static final String ACK_MODE_CTX_VAR = "ACKNOWLEDGEMENT_MODE";
    public static final String DESTINATION_CTX_VAR = "DESTINATION";
    public static final String MESSAGE_CTX_VAR = "MESSAGE";
    private static final Logger LOGGER = LoggerFactory.getLogger(AnypointMQSource.class);
    private static final String SOURCE_NAME_MASK = "_MQ_Subscriber_%s";
    private static final MQCircuitBreaker NO_OP_CIRCUIT = new NoOpCircuitBreaker();
    @Parameter
    @Placement(order=1)
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @DisplayName(value="Queue")
    @Summary(value="The name of the Queue from which messages will be retrieved")
    private String destination;
    @Parameter
    @Placement(order=2)
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @ParameterDsl(allowReferences=false)
    @Optional
    @NullSafe(defaultImplementingType=PrefetchTypeSubscriberFactory.class)
    @Summary(value="The strategy to be used when subscribing for messages in the Queue")
    private SubscriberFactory subscriberType;
    @Parameter
    @Placement(order=3)
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Summary(value="Acknowledgement mode to use for the messages retrieved")
    @Optional(defaultValue="AUTO")
    private SubscriberAckMode acknowledgementMode;
    @Parameter
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Summary(value="Duration that a message is held by a consumer waiting for an ACK or NACK, before returning to the Queue for redelivery")
    @Placement(order=4)
    @Optional(defaultValue="0")
    private long acknowledgementTimeout;
    @Parameter
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @Summary(value="Time unit to be used in the acknowledgementTimeout configuration")
    @Placement(order=5)
    @Optional(defaultValue="MILLISECONDS")
    private TimeUnit acknowledgementTimeoutUnit;
    @Parameter
    @Optional
    @Alias(value="circuitBreaker")
    @Expression(value=ExpressionSupport.NOT_SUPPORTED)
    @ParameterDsl(allowReferences=true)
    @Placement(order=6, tab="Advanced")
    private CircuitBreakerConfiguration circuitBreakerConfig;
    @Connection
    private ConnectionProvider<AnypointMQConnection> connectionProvider;
    @Inject
    private MQCircuitsManager circuitsManager;
    @Inject
    private SchedulerService schedulerService;
    private ComponentLocation componentLocation;
    private MQCircuitBreaker circuitBreaker = NO_OP_CIRCUIT;
    private AnypointMQServer server;
    private boolean isRunning;

    public void onStart(SourceCallback<InputStream, AnypointMQMessageAttributes> callback) throws ConnectionException {
        String subscriberName = String.format(SOURCE_NAME_MASK, this.componentLocation.getRootContainerName());
        this.initialiseCircuitBreaker(subscriberName);
        Destination courierDestination = this.getDestination();
        this.server = new AnypointMQServer(new SubscriberConfig(this.acknowledgementMode, this.acknowledgementTimeout, this.acknowledgementTimeoutUnit, this.subscriberType), courierDestination, callback, this.circuitBreaker, subscriberName, this.schedulerService);
        this.server.start();
        this.isRunning = true;
    }

    private Destination getDestination() throws ConnectionException {
        Destination courierDestination;
        try {
            courierDestination = ((AnypointMQConnection)this.connectionProvider.connect()).getDestination(this.destination);
        }
        catch (ResourceNotFoundException e) {
            throw new MQDestinationNotFoundException(String.format("Failed to subscribe on destination '%s'. Such destination does not exist in the configured environment", this.destination));
        }
        return courierDestination;
    }

    private void initialiseCircuitBreaker(String subscriberName) {
        if (this.circuitBreakerConfig != null) {
            if (this.circuitBreakerConfig.getErrorsThreshold() <= 0) {
                String msg = String.format("Circuit's errors threshold `%s` cannot be 0 or less", this.circuitBreakerConfig.getErrorsThreshold());
                throw new IllegalArgumentException(msg);
            }
            if (this.circuitBreakerConfig.getTripTimeout() <= 0L) {
                String msg = String.format("Circuit's trip timeout `%s` cannot be 0 or less", this.circuitBreakerConfig.getTripTimeout());
                throw new IllegalArgumentException(msg);
            }
            this.circuitBreaker = this.circuitsManager.getOrCreateCircuit(this.circuitBreakerConfig, subscriberName);
        }
    }

    public void onStop() {
        LOGGER.debug("Source requested to stop");
        if (this.server != null) {
            this.server.stop();
            this.isRunning = false;
        }
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext callbackContext) throws Exception {
        this.notifyCircuitSuccess();
        if (!SubscriberAckMode.AUTO.equals((Object)this.getAckMode(callbackContext))) {
            return;
        }
        Destination destination = this.getDestination(callbackContext);
        AnypointMQMessage message = this.getMessage(callbackContext);
        Lock ackLock = new Lock(message);
        boolean publishedToFallbackDestination = message.getPublishedToFallback();
        try {
            this.doAck(destination, ackLock, publishedToFallbackDestination);
        }
        catch (Exception e) {
            this.doNack(destination, ackLock, publishedToFallbackDestination);
        }
    }

    @OnError
    public void onError(Error error, SourceCallbackContext callbackContext) {
        this.notifyCircuitError(error);
        if (!SubscriberAckMode.AUTO.equals((Object)this.getAckMode(callbackContext))) {
            return;
        }
        Destination destination = this.getDestination(callbackContext);
        AnypointMQMessage message = this.getMessage(callbackContext);
        this.doNack(destination, new Lock(message), message.getPublishedToFallback());
    }

    private void notifyCircuitSuccess() {
        LOGGER.debug("Notify Circuit SUCCESS");
        this.circuitBreaker.onSuccess();
    }

    private void notifyCircuitError(Error error) {
        String type = this.resolveErrorName(error);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Notify Circuit ERROR: {}", (Object)type);
            if (error != null) {
                LOGGER.debug("Error Cause: ", error.getCause());
            } else {
                LOGGER.debug("Error Cause: 'UNKNOWN' - Error was 'null'");
            }
        }
        this.circuitBreaker.onFailure(type);
    }

    private String resolveErrorName(Error error) {
        return error != null ? error.getErrorType().getNamespace() + ":" + error.getErrorType().getIdentifier() : "UNKNOWN";
    }

    private void doAck(final Destination destination, final Lock ackLock, boolean publishedToFallbackDestination) {
        LOGGER.debug("Doing a ACK on destination '{}' with lockId '{}'", (Object)destination.getName(publishedToFallbackDestination), (Object)ackLock.getLockId());
        destination.ack(ackLock, publishedToFallbackDestination).subscribe((CourierObserver)new CourierObserver<MessageIdResult>(){

            public void onSuccess(MessageIdResult successValue) {
            }

            public void onError(Throwable e) {
                String msg = String.format("Failed to do AUTO 'ACK' on the message with id '%s' on destination '%s': %s", ackLock.getMessageId(), destination.getName(), e.getMessage());
                if (AnypointMQSource.this.isRunning) {
                    LOGGER.error(msg, e);
                    throw new MQAckException(msg, e);
                }
                LOGGER.debug(msg);
            }
        });
    }

    private void doNack(final Destination destination, final Lock ackLock, boolean publishedToFallbackDestination) {
        LOGGER.debug("Doing a NACK on destination '{}' with lockId '{}'", (Object)destination.getName(publishedToFallbackDestination), (Object)ackLock.getLockId());
        try {
            destination.nack(ackLock, publishedToFallbackDestination).subscribe((CourierObserver)new CourierObserver<MessageIdResult>(){

                public void onSuccess(MessageIdResult successValue) {
                }

                public void onError(Throwable e) {
                    String msg = String.format("Failed to do AUTO 'NACK' on the message with id '%s' on destination '%s': %s", ackLock.getMessageId(), destination.getName(), e.getMessage());
                    if (AnypointMQSource.this.isRunning) {
                        LOGGER.error(msg, e);
                        throw new MQNackException(msg, e);
                    }
                    LOGGER.debug(msg);
                }
            });
        }
        catch (Exception e) {
            LOGGER.error("An error occurred while trying to perform an AUTO 'NACK' on message with id '{}' on destination '{}': {}", new Object[]{ackLock.getMessageId(), destination.getName(), e.getMessage(), e});
        }
    }

    private AnypointMQMessage getMessage(SourceCallbackContext callbackContext) {
        return (AnypointMQMessage)callbackContext.getVariable(MESSAGE_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'Message' information on callback"));
    }

    private Destination getDestination(SourceCallbackContext callbackContext) {
        return (Destination)callbackContext.getVariable(DESTINATION_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'Destination' information on callback"));
    }

    private SubscriberAckMode getAckMode(SourceCallbackContext callbackContext) {
        return (SubscriberAckMode)((Object)callbackContext.getVariable(ACK_MODE_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'Acknowledgment Mode' information on callback")));
    }
}

