/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal;

import com.google.common.collect.ImmutableList;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.api.source.MQSubscriber;
import com.mulesoft.extension.mq.internal.server.MessageListener;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.utils.ClientUtils;
import com.mulesoft.mq.restclient.utils.MessageUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriber
implements MQSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriber.class);
    private final MessageListener messageListener;
    protected final MQCircuitBreaker circuitBreaker;
    protected final Destination destination;
    protected final SubscriberAckMode acknowledgementMode;
    protected final long acknowledgementTimeout;
    protected AtomicBoolean isRunning = new AtomicBoolean(false);

    public AbstractSubscriber(SubscriberAckMode acknowledgementMode, long acknowledgementTimeout, Destination destination, MessageListener messageListener, MQCircuitBreaker circuitBreaker) {
        this.acknowledgementMode = acknowledgementMode;
        this.acknowledgementTimeout = acknowledgementTimeout;
        this.destination = destination;
        this.messageListener = messageListener;
        this.circuitBreaker = circuitBreaker;
    }

    @Override
    public synchronized void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            LOGGER.debug("Starting Subscriber on destination '{}'", (Object)this.destination.getName());
            this.submitWork();
        }
    }

    @Override
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            LOGGER.debug("Stopping Subscriber on destination '{}'", (Object)this.destination.getName());
        }
    }

    protected void submitWork() {
        if (this.isRunning.get()) {
            this.doSubmitWork();
        }
    }

    protected abstract void doSubmitWork();

    protected void handleError(Throwable e) {
        if (!this.isRunning.get()) {
            LOGGER.debug("An error occurred while subscriber was shutting down: " + e.getMessage(), e);
        } else if (e instanceof ResourceNotFoundException) {
            LOGGER.error("Destination '{}' not found. Shutting down subscriber...", (Object)this.destination.getName());
            this.stop();
        } else if (!ClientUtils.isTimeout((Throwable)e)) {
            this.messageListener.onError(e);
        } else {
            LOGGER.error("Can not retrieve messages: {}.", (Object)MessageUtils.getCompleteMessage((Throwable)e), (Object)e);
        }
    }

    protected void processMessages(List<AnypointMQMessage> messages) {
        LOGGER.debug("Received messages count: {}", (Object)messages.size());
        this.processMessages(messages.iterator());
    }

    protected void processMessages(Iterator<AnypointMQMessage> messages) {
        if (this.isRunning.get()) {
            while (messages.hasNext()) {
                this.processMessage(messages.next());
            }
        } else {
            LOGGER.debug("Subscriber stopped, returning all local messages to the queue");
            this.nackAll(messages);
        }
    }

    protected void processMessage(AnypointMQMessage message) {
        this.messageListener.onReceive(message);
    }

    protected void nackAll(Iterator<AnypointMQMessage> messages) {
        this.nackAll((List<AnypointMQMessage>)ImmutableList.copyOf(messages));
    }

    protected void nackAll(List<AnypointMQMessage> messages) {
        if (!messages.isEmpty()) {
            Map<Boolean, List<AnypointMQMessage>> nackToFallbackDestination = messages.stream().collect(Collectors.partitioningBy(AnypointMQMessage::getPublishedToFallback));
            List messagesToNack = nackToFallbackDestination.get(false).stream().map(m -> new Lock(m.getMessageId(), m.getLockId())).collect(Collectors.toList());
            List messagesToNackFallback = nackToFallbackDestination.get(true).stream().map(m -> new Lock(m.getMessageId(), m.getLockId())).collect(Collectors.toList());
            if (!messagesToNack.isEmpty()) {
                this.destination.nack(messagesToNack, false).fireAndForget();
            }
            if (!messagesToNackFallback.isEmpty()) {
                this.destination.nack(messagesToNack, true).fireAndForget();
            }
        }
    }
}

