/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal.server;

import com.mulesoft.extension.mq.api.message.AnypointMQMessageContext;
import com.mulesoft.extension.mq.internal.Subscriber;
import com.mulesoft.extension.mq.internal.config.AnypointMQConfiguration;
import com.mulesoft.extension.mq.internal.config.MQPrefetchConfiguration;
import com.mulesoft.extension.mq.internal.config.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.config.SubscriberConfiguration;
import com.mulesoft.extension.mq.internal.connection.AnypointMQConnection;
import com.mulesoft.extension.mq.internal.domain.MessageListener;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.impl.PrefetchedDestination;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.internal.ScheduledPrefetcher;
import com.mulesoft.mq.restclient.internal.TimeSupplier;
import java.util.Map;
import java.util.Optional;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnypointMQServer {
    private static final Logger logger = LoggerFactory.getLogger(AnypointMQServer.class);
    private final String destination;
    private final AnypointMQConfiguration config;
    private final AnypointMQConnection connection;
    private final SourceCallback<byte[], AnypointMQMessageContext> callback;
    private Subscriber subscriber;
    private ScheduledPrefetcher prefetcher;

    public AnypointMQServer(String destination, AnypointMQConfiguration config, AnypointMQConnection connection, SourceCallback<byte[], AnypointMQMessageContext> callback) {
        this.destination = destination;
        this.config = config;
        this.connection = connection;
        this.callback = callback;
        Destination courierDestination = connection.getDestination(destination);
        int pollingThreads = 2;
        int batchSize = 10;
        MQPrefetchConfiguration prefetch = config.getPrefetch();
        if (prefetch != null) {
            this.prefetcher = new ScheduledPrefetcher(courierDestination, prefetch.getFetchSize(), prefetch.getFetchTimeout(), Optional.ofNullable(config.getAcknowledgementTimeout()).orElse(120000L).longValue(), prefetch.getFrequency(), new MessagePreserver(courierDestination, new TimeSupplier()));
            courierDestination = new PrefetchedDestination(courierDestination, (Prefetcher)this.prefetcher);
            pollingThreads = 1;
            batchSize = 1;
        }
        this.subscriber = new Subscriber(config, courierDestination, new ExtensionMessageListener(config, courierDestination, this.callback), connection.getMessageContextFactory(), pollingThreads, batchSize);
        this.subscriber.start();
    }

    public void stop() {
        this.connection.disconnect();
        if (this.prefetcher != null) {
            this.prefetcher.stop();
        }
        if (this.subscriber != null) {
            this.subscriber.stop();
        }
    }

    private class ExtensionMessageListener
    implements MessageListener {
        private final SubscriberConfiguration subscriberConfiguration;
        private final SubscriberAckMode acknowledgementMode;
        private final Destination destination;
        private final SourceCallback<byte[], AnypointMQMessageContext> callback;

        ExtensionMessageListener(SubscriberConfiguration subscriberConfiguration, Destination destination, SourceCallback<byte[], AnypointMQMessageContext> callback) {
            this.subscriberConfiguration = subscriberConfiguration;
            this.acknowledgementMode = Optional.ofNullable(subscriberConfiguration.getAcknowledgementMode()).orElse(SubscriberAckMode.AUTO);
            this.destination = destination;
            this.callback = callback;
        }

        @Override
        public void onReceive(AnypointMQMessageContext messageContext) {
            if (this.shouldHandleRedelivery(messageContext, Optional.ofNullable(this.subscriberConfiguration.getMaxRedelivery()).orElse(-1))) {
                if (this.acknowledgementMode == SubscriberAckMode.IMMEDIATE) {
                    this.destination.ack(messageContext.getMessage()).fireAndForget();
                }
                this.handleMessage(messageContext);
            }
        }

        private void handleMessage(AnypointMQMessageContext messageContext) {
            Map properties = messageContext.getMessage().getProperties();
            Result.Builder resultbuilder = Result.builder().output((Object)messageContext.getMessage().getBody()).attributes((Object)messageContext);
            if (properties.containsKey("contentType")) {
                resultbuilder.mediaType(MediaType.parse((String)((String)properties.get("contentType"))));
            }
            SourceCallbackContext callbackContext = this.callback.createContext();
            callbackContext.addVariable("ACKNOWLEDGEMENT_MODE", (Object)this.acknowledgementMode);
            callbackContext.addVariable("DESTINATION", (Object)this.destination);
            callbackContext.addVariable("MESSAGE_CONTEXT", (Object)messageContext);
            this.callback.handle(resultbuilder.build(), callbackContext);
        }

        @Override
        public void onError(Throwable throwable) {
            if (throwable instanceof ResourceNotFoundException) {
                logger.error("Destination not found: {}. Shutting down subscriber...", (Object)this.destination);
                this.callback.onConnectionException(new ConnectionException(throwable));
            } else {
                logger.error("Can not process received message.", throwable);
            }
        }

        private boolean shouldHandleRedelivery(AnypointMQMessageContext messageContext, Integer maxRedelivery) throws RuntimeException {
            int redeliveries;
            boolean shouldHandle = true;
            if (maxRedelivery >= 0 && (redeliveries = messageContext.getMessage().getDeliveryCount() - 1) > maxRedelivery) {
                shouldHandle = false;
            }
            return shouldHandle;
        }
    }
}

