/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal.source;

import com.mulesoft.extension.mq.api.message.AnypointMQMessageContext;
import com.mulesoft.extension.mq.internal.config.AnypointMQConfiguration;
import com.mulesoft.extension.mq.internal.config.CircuitBreakerConfiguration;
import com.mulesoft.extension.mq.internal.config.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.connection.AnypointMQConnection;
import com.mulesoft.extension.mq.internal.server.AnypointMQServer;
import com.mulesoft.extension.mq.internal.source.MQCircuitsManager;
import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.internal.circuit.NoOpCircuitBreaker;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.param.Config;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value="subscriber")
@MediaType(value="*/*", strict=false)
public class AnypointMQSource
extends Source<byte[], AnypointMQMessageContext> {
    public static final String ACK_MODE_CTX_VAR = "ACKNOWLEDGEMENT_MODE";
    public static final String DESTINATION_CTX_VAR = "DESTINATION";
    public static final String MESSAGE_CONTEXT_CTX_VAR = "MESSAGE_CONTEXT";
    private static final Logger LOGGER = LoggerFactory.getLogger(AnypointMQSource.class);
    private static final MQCircuitBreaker NO_OP_CIRCUIT = new NoOpCircuitBreaker();
    @Parameter
    private String destination;
    @Connection
    private ConnectionProvider<AnypointMQConnection> connectionProvider;
    @Config
    private AnypointMQConfiguration config;
    @Inject
    private MQCircuitsManager circuitsManager;
    private MQCircuitBreaker circuitBreaker;
    private AnypointMQServer server;

    public void onStart(SourceCallback<byte[], AnypointMQMessageContext> callback) throws ConnectionException {
        this.circuitBreaker = this.config.getCircuitBreakerConfiguration().map(cb -> this.circuitsManager.getOrCreateCircuit((CircuitBreakerConfiguration)cb, this.destination)).orElse(NO_OP_CIRCUIT);
        this.server = new AnypointMQServer(this.destination, this.config, (AnypointMQConnection)this.connectionProvider.connect(), callback, this.circuitBreaker);
        this.server.start();
    }

    public void onStop() {
        if (this.server != null) {
            this.server.stop();
        }
    }

    @OnSuccess
    public void onSuccess(SourceCallbackContext callbackContext) throws Exception {
        this.notifyCircuitSuccess();
        if (!SubscriberAckMode.AUTO.equals((Object)this.getAckMode(callbackContext))) {
            return;
        }
        Destination destination = this.getDestination(callbackContext);
        AnypointMQMessageContext messageContext = this.getMessage(callbackContext);
        try {
            this.doAck(destination, messageContext.getMessage());
        }
        catch (Exception e) {
            this.doNack(destination, messageContext.getMessage());
        }
    }

    @OnError
    public void onError(Error error, SourceCallbackContext callbackContext) {
        this.notifyCircuitError(error);
        if (!SubscriberAckMode.AUTO.equals((Object)this.getAckMode(callbackContext))) {
            return;
        }
        Destination destination = this.getDestination(callbackContext);
        AnypointMQMessageContext messageContext = this.getMessage(callbackContext);
        this.doNack(destination, messageContext.getMessage());
    }

    private void notifyCircuitSuccess() {
        LOGGER.debug("Notify Circuit SUCCESS");
        this.circuitBreaker.onSuccess();
    }

    private void notifyCircuitError(Error error) {
        String type = error.getErrorType().getNamespace() + ":" + error.getErrorType().getIdentifier();
        LOGGER.debug("Notify Circuit ERROR: {}", (Object)type);
        this.circuitBreaker.onFailure(type);
    }

    private void doAck(Destination destination, AnypointMqMessage messageContext) {
        destination.ack(messageContext).fireAndForget();
    }

    private void doNack(Destination destination, AnypointMqMessage messageContext) {
        try {
            destination.nack(messageContext).fireAndForget();
        }
        catch (Exception e) {
            LOGGER.error("An error occurred while trying to 'NACK' a message on destination '{}': {}", new Object[]{destination.getName(), e.getMessage(), e});
        }
    }

    private AnypointMQMessageContext getMessage(SourceCallbackContext callbackContext) {
        return (AnypointMQMessageContext)callbackContext.getVariable(MESSAGE_CONTEXT_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'MessageContext' information on callback"));
    }

    private Destination getDestination(SourceCallbackContext callbackContext) {
        return (Destination)callbackContext.getVariable(DESTINATION_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'Destination' information on callback"));
    }

    private SubscriberAckMode getAckMode(SourceCallbackContext callbackContext) {
        return (SubscriberAckMode)((Object)callbackContext.getVariable(ACK_MODE_CTX_VAR).orElseThrow(() -> new IllegalStateException("Missing 'Acknowledgment Mode' information on callback")));
    }
}

