/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal;

import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.AbstractSubscriber;
import com.mulesoft.extension.mq.internal.server.MessageListener;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.ReceiveMessageConfiguration;
import com.mulesoft.mq.restclient.client.mq.domain.ReceiveMessageResult;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.utils.ExecutorUtils;
import com.mulesoft.mq.restclient.utils.FallbackUtils;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mule.runtime.core.api.source.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingSubscriber
extends AbstractSubscriber {
    public static final int MAX_POLLING_TIMEOUT = 20000;
    private static final int ACK_DEFAULT_MULTIPLIER = 4;
    private static final Logger LOGGER = LoggerFactory.getLogger(PollingSubscriber.class);
    private final int fetchSize;
    private final AtomicBoolean isRequestInFlight = new AtomicBoolean(false);
    private final Scheduler schedulingStrategy;
    private final org.mule.runtime.api.scheduler.Scheduler executor;
    private int primaryRegionStatusCheckIntervalMs;
    private int fallbackMessagesCheckIntervalMs;
    private boolean fallbackFeatureSystemPropertyEnabled;

    public PollingSubscriber(SubscriberAckMode acknowledgementMode, long acknowledgementTimeout, int fetchSize, Destination destination, MessageListener messageListener, MQCircuitBreaker circuitBreaker, org.mule.runtime.api.scheduler.Scheduler executor, Scheduler schedulingStrategy) {
        super(acknowledgementMode, acknowledgementTimeout, destination, messageListener, circuitBreaker);
        this.fetchSize = fetchSize;
        this.schedulingStrategy = schedulingStrategy;
        this.executor = executor;
    }

    public PollingSubscriber(SubscriberAckMode acknowledgementMode, long acknowledgementTimeout, int fetchSize, Destination destination, MessageListener messageListener, MQCircuitBreaker circuitBreaker, org.mule.runtime.api.scheduler.Scheduler executor, Scheduler schedulingStrategy, int primaryRegionStatusCheckIntervalMs, int fallbackMessagesCheckIntervalMs, boolean fallbackFeatureSystemPropertyEnabled) {
        this(acknowledgementMode, acknowledgementTimeout, fetchSize, destination, messageListener, circuitBreaker, executor, schedulingStrategy);
        this.primaryRegionStatusCheckIntervalMs = primaryRegionStatusCheckIntervalMs;
        this.fallbackMessagesCheckIntervalMs = fallbackMessagesCheckIntervalMs;
        this.fallbackFeatureSystemPropertyEnabled = fallbackFeatureSystemPropertyEnabled;
    }

    @Override
    protected void doSubmitWork() {
        this.schedulingStrategy.schedule(this.executor, () -> {
            try {
                this.subscribeForMessages();
            }
            catch (Throwable t) {
                LOGGER.error(String.format("Can not subscribe for messages on destination '%s': %s", this.destination.getName(), t.getMessage()), t);
                throw t;
            }
        });
    }

    @Override
    public void stop() {
        super.stop();
        if (this.executor != null) {
            LOGGER.debug("Stopping Executor");
            ExecutorUtils.stopExecutorService((ExecutorService)this.executor);
        }
    }

    private void subscribeForMessages() {
        if (this.shouldSkipMessageFetch()) {
            return;
        }
        if (this.circuitBreaker.isClosed()) {
            this.doFetch(this.fetchSize);
        } else {
            LOGGER.debug("Circuit HALF_OPEN, submitting request for circuit testing");
            this.doFetch(1);
        }
    }

    private boolean shouldSkipMessageFetch() {
        if (!this.isRunning.get()) {
            LOGGER.debug("Skipping message fetch on destination '{}', already requested to Stop", (Object)this.destination.getName());
            return true;
        }
        if (this.circuitBreaker.isOpen()) {
            LOGGER.debug("Circuit is OPEN, skipping message fetch");
            return true;
        }
        if (!this.isRequestInFlight.compareAndSet(false, true)) {
            LOGGER.debug("Skipping message fetch, already one request in flight for subscriber on '{}'", (Object)this.destination.getName());
            return true;
        }
        return false;
    }

    private void doFetch(int fetchSize) {
        LOGGER.debug("Do fetch of '{}' messages", (Object)fetchSize);
        ReceiveMessageConfiguration receiveMessageConfiguration = FallbackUtils.determineReceiveMessageConfiguration((Destination)this.destination, (int)this.primaryRegionStatusCheckIntervalMs, (int)this.fallbackMessagesCheckIntervalMs, (boolean)this.fallbackFeatureSystemPropertyEnabled);
        this.receiveMessages(fetchSize, receiveMessageConfiguration.getUseFallbackDestination(), receiveMessageConfiguration.getShortPolling(), receiveMessageConfiguration.getRetryCount());
    }

    private void receiveMessages(final int fetchSize, final boolean useFallbackDestination, boolean shortPolling, int retryCount) {
        this.destination.receive(fetchSize, 20000L, this.acknowledgementTimeout, useFallbackDestination, shortPolling, retryCount).subscribe((CourierObserver)new CourierObserver<List<AnypointMQMessage>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onSuccess(List<AnypointMQMessage> messages) {
                try {
                    ReceiveMessageResult receiveMessageResult;
                    if (PollingSubscriber.this.fallbackFeatureSystemPropertyEnabled && PollingSubscriber.this.destination.getFallbackConfiguration().equals((Object)Destination.FallbackConfiguration.CONFIGURED) && (receiveMessageResult = FallbackUtils.determineReceiveNextStepsOnSuccess(messages, (Destination)PollingSubscriber.this.destination, (boolean)useFallbackDestination, (int)PollingSubscriber.this.primaryRegionStatusCheckIntervalMs)).getMakeCallToPrimary()) {
                        PollingSubscriber.this.receiveMessages(fetchSize, false, false, receiveMessageResult.getRetryCount());
                        return;
                    }
                    if (messages != null && messages.size() > 0) {
                        for (AnypointMQMessage message : messages) {
                            message.setPublishedToFallback(useFallbackDestination);
                        }
                        PollingSubscriber.this.handleSuccess(messages);
                    } else {
                        LOGGER.debug("No messages received from destination '{}'", (Object)PollingSubscriber.this.destination.getName(useFallbackDestination));
                        PollingSubscriber.this.circuitBreaker.releaseCircuitLock();
                    }
                }
                finally {
                    PollingSubscriber.this.isRequestInFlight.set(false);
                }
            }

            public void onError(Throwable e) {
                try {
                    ReceiveMessageResult receiveMessageResult;
                    if (PollingSubscriber.this.fallbackFeatureSystemPropertyEnabled && PollingSubscriber.this.destination.getFallbackConfiguration().equals((Object)Destination.FallbackConfiguration.CONFIGURED) && (receiveMessageResult = FallbackUtils.determineReceiveNextStepsOnError((Throwable)e, (Destination)PollingSubscriber.this.destination, (boolean)useFallbackDestination, (int)PollingSubscriber.this.primaryRegionStatusCheckIntervalMs)).getMakeCallToPrimary()) {
                        PollingSubscriber.this.receiveMessages(fetchSize, false, false, receiveMessageResult.getRetryCount());
                        return;
                    }
                    PollingSubscriber.this.handleError(e);
                    if (PollingSubscriber.this.circuitBreaker.isHalfOpen()) {
                        LOGGER.debug("An error occurred while obtaining a message to test the circuit, releasing the lock");
                        PollingSubscriber.this.circuitBreaker.releaseCircuitLock();
                    }
                }
                finally {
                    PollingSubscriber.this.isRequestInFlight.set(false);
                }
            }
        });
    }

    private void handleSuccess(List<AnypointMQMessage> messages) {
        switch (this.circuitBreaker.getState()) {
            case OPEN: {
                LOGGER.debug("Circuit is OPEN, returning all the messages to the Queue");
                this.nackAll(messages);
                this.circuitBreaker.releaseCircuitLock();
                break;
            }
            case CLOSED: {
                this.processMessages(messages);
                break;
            }
            case HALF_OPEN: {
                this.tryCircuitTest(messages);
                break;
            }
            default: {
                LOGGER.error("Invalid circuit breaker value: {}", (Object)this.circuitBreaker.getState());
            }
        }
    }

    private void tryCircuitTest(List<AnypointMQMessage> messages) {
        if (this.circuitBreaker.acquireCircuitLock() && !messages.isEmpty()) {
            LOGGER.debug("Circuit HALF_OPEN, dispatching single message for testing");
            Iterator<AnypointMQMessage> messageIterator = messages.iterator();
            this.processMessage(messageIterator.next());
            this.waitCircuitTest();
            if (this.circuitBreaker.isClosed()) {
                this.processMessages(messageIterator);
                LOGGER.debug("Circuit recovered, submitting next poll");
                this.executor.submit(this::subscribeForMessages);
            } else {
                LOGGER.debug("Circuit testing failed, returning remaining messages to the Queue and waiting for next poll");
                this.nackAll(messageIterator);
            }
        } else {
            LOGGER.debug("Test in progress, skipping poll");
        }
    }

    private void waitCircuitTest() {
        block2: {
            try {
                LOGGER.debug("Circuit testing in progress, waiting for circuit test to complete");
                int circuitTtl = this.acknowledgementTimeout == 0L ? 480000 : (int)this.acknowledgementTimeout * 4;
                this.circuitBreaker.awaitCircuitLock(circuitTtl);
            }
            catch (InterruptedException e) {
                if (!this.isRunning.get()) break block2;
                LOGGER.debug("Thread interrupted while waiting for the circuit test lock");
            }
        }
    }
}

