/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.client.mq.domain.Lock;
import com.mulesoft.mq.restclient.client.mq.domain.ReceiveMessageConfiguration;
import com.mulesoft.mq.restclient.client.mq.domain.ReceiveMessageResult;
import com.mulesoft.mq.restclient.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.exception.RestException;
import com.mulesoft.mq.restclient.internal.BufferedQueue;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.internal.impl.SimpleBufferedQueue;
import com.mulesoft.mq.restclient.utils.ClientUtils;
import com.mulesoft.mq.restclient.utils.ExecutorUtils;
import com.mulesoft.mq.restclient.utils.FallbackUtils;
import com.mulesoft.mq.restclient.utils.MessageUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class ScheduledPrefetcher
implements Prefetcher {
    public static final int MAX_CONCURRENT_REQUESTS = 3;
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private final BlockingQueue<Subscriber<? super AnypointMQMessage>> waitingSubscribers = new LinkedBlockingQueue<Subscriber<? super AnypointMQMessage>>();
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long lockTimeToLive;
    private final int bufferLimit;
    private final MQCircuitBreaker circuitBreaker;
    private final AtomicInteger inFlightRequests = new AtomicInteger(0);
    private final int maxConcurrentRequests;
    private final int circuitTtl;
    private AtomicBoolean running;
    private long retrievePeriod;
    private MessagePreserver preserver;
    private BufferedQueue bufferedQueue;
    private RestException invalidPrefetcherException;
    private int primaryRegionCheckIntervalMilliseconds;
    private int fallbackMessagesCheckIntervalMilliseconds;
    private boolean fallbackFeatureSystemPropertyEnabled;

    public ScheduledPrefetcher(Destination destination, int maxLocalMessages, long lockTimeToLive, long retrievePeriod, MessagePreserver preserver, MQCircuitBreaker circuitBreaker, int circuitTtl) {
        Preconditions.checkArgument((circuitBreaker != null ? 1 : 0) != 0, (Object)"A valid MQ CircuitBreaker implementation should be provided but 'null' was found");
        this.circuitBreaker = circuitBreaker;
        this.destination = destination;
        this.bufferLimit = maxLocalMessages;
        this.batchSize = Math.min(maxLocalMessages, 10);
        this.maxConcurrentRequests = this.getMaxConcurrentRequests(maxLocalMessages);
        this.retrievePeriod = retrievePeriod;
        this.lockTimeToLive = lockTimeToLive <= 0L ? 120000L : lockTimeToLive;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("scheduled-prefetcher-retriever-%d").build());
        this.preserver = preserver == null ? new NoOpMessagePreserver() : preserver;
        this.running = new AtomicBoolean(false);
        this.circuitTtl = circuitTtl;
    }

    public ScheduledPrefetcher(Destination destination, int maxLocalMessages, long lockTimeToLive, long retrievePeriod, MessagePreserver preserver, MQCircuitBreaker circuitBreaker, int circuitTtl, int primaryRegionCheckIntervalMilliseconds, int fallbackMessagesCheckIntervalMilliseconds, boolean fallbackFeatureSystemPropertyEnabled) {
        this(destination, maxLocalMessages, lockTimeToLive, retrievePeriod, preserver, circuitBreaker, circuitTtl);
        this.primaryRegionCheckIntervalMilliseconds = primaryRegionCheckIntervalMilliseconds;
        this.fallbackMessagesCheckIntervalMilliseconds = fallbackMessagesCheckIntervalMilliseconds;
        this.fallbackFeatureSystemPropertyEnabled = fallbackFeatureSystemPropertyEnabled;
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.retriever.scheduleAtFixedRate(() -> this.retrieveMessages(), 0L, this.retrievePeriod, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            AnypointMQMessage message;
            while ((message = this.bufferedQueue.take()) != null) {
                this.preserver.remove(message.getMessageId());
            }
            this.preserver.stop();
            this.bufferedQueue.clear();
            this.inFlightRequests.set(0);
            ExecutorUtils.stopExecutorService(this.retriever);
        }
    }

    private void retrieveMessages() {
        if (!this.running.get()) {
            LOGGER.debug("Prefetcher already stopped, skipping message fetch");
            return;
        }
        switch (this.circuitBreaker.getState()) {
            case OPEN: {
                LOGGER.debug("Circuit is OPEN, skipping message fetch");
                break;
            }
            case CLOSED: {
                this.doFetch(this.batchSize);
                break;
            }
            case HALF_OPEN: {
                if (this.inFlightRequests.get() > 0) {
                    LOGGER.debug("Already {} request in progress, skipping message fetch for test", (Object)this.inFlightRequests.get());
                    break;
                }
                this.doFetch(1);
                break;
            }
            default: {
                LOGGER.error("Invalid circuit breaker value: {}", (Object)this.circuitBreaker.getState());
            }
        }
    }

    private void doFetch(int fetchSize) {
        int bufferSize = this.bufferedQueue.size();
        if (bufferSize >= this.bufferLimit) {
            LOGGER.debug("Buffer is full, skipping message fetch");
            return;
        }
        if (this.inFlightRequests.get() >= this.maxConcurrentRequests) {
            LOGGER.debug("Already {} request in progress, skipping message fetch", (Object)this.maxConcurrentRequests);
            return;
        }
        LOGGER.debug("Retrieving messages...");
        LOGGER.debug("Buffer contains '{}' messages", (Object)bufferSize);
        this.inFlightRequests.incrementAndGet();
        ReceiveMessageConfiguration receiveMessageConfiguration = FallbackUtils.determineReceiveMessageConfiguration(this.destination, this.primaryRegionCheckIntervalMilliseconds, this.fallbackMessagesCheckIntervalMilliseconds, this.fallbackFeatureSystemPropertyEnabled);
        this.receiveMessages(fetchSize, receiveMessageConfiguration.getUseFallbackDestination(), receiveMessageConfiguration.getShortPolling(), receiveMessageConfiguration.getRetryCount());
    }

    private void receiveMessages(final int fetchSize, final boolean useFallbackDestination, boolean shortPolling, int retryCount) {
        this.destination.receive(fetchSize, 20000L, this.lockTimeToLive, useFallbackDestination, shortPolling, retryCount).subscribe(new CourierObserver<List<AnypointMQMessage>>(){

            @Override
            public void onSuccess(List<AnypointMQMessage> messages) {
                ReceiveMessageResult receiveMessageResult;
                if (ScheduledPrefetcher.this.fallbackFeatureSystemPropertyEnabled && ScheduledPrefetcher.this.destination.getFallbackConfiguration().equals((Object)Destination.FallbackConfiguration.CONFIGURED) && (receiveMessageResult = FallbackUtils.determineReceiveNextStepsOnSuccess(messages, ScheduledPrefetcher.this.destination, useFallbackDestination, ScheduledPrefetcher.this.primaryRegionCheckIntervalMilliseconds)).getMakeCallToPrimary()) {
                    ScheduledPrefetcher.this.receiveMessages(fetchSize, false, false, receiveMessageResult.getRetryCount());
                    return;
                }
                if (messages != null && messages.size() > 0) {
                    for (AnypointMQMessage message : messages) {
                        message.setPublishedToFallback(useFallbackDestination);
                    }
                    ScheduledPrefetcher.this.handleRetrieveSuccess(messages);
                } else {
                    LOGGER.debug("No messages received");
                    ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                    ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
                }
            }

            @Override
            public void onError(Throwable e) {
                ReceiveMessageResult receiveMessageResult;
                if (ScheduledPrefetcher.this.fallbackFeatureSystemPropertyEnabled && ScheduledPrefetcher.this.destination.getFallbackConfiguration().equals((Object)Destination.FallbackConfiguration.CONFIGURED) && (receiveMessageResult = FallbackUtils.determineReceiveNextStepsOnError(e, ScheduledPrefetcher.this.destination, useFallbackDestination, ScheduledPrefetcher.this.primaryRegionCheckIntervalMilliseconds)).getMakeCallToPrimary()) {
                    ScheduledPrefetcher.this.receiveMessages(fetchSize, false, false, receiveMessageResult.getRetryCount());
                    return;
                }
                ScheduledPrefetcher.this.handleRetrieveError(e);
                ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }
        });
    }

    private void handleRetrieveError(Throwable e) {
        if (ClientUtils.isTimeout(e)) {
            LOGGER.debug("Timeout while retrieving messages.");
        } else if (this.requestedToStop()) {
            LOGGER.debug("Prefetcher already disposed, ignoring exception.", e);
        } else if (e instanceof ResourceNotFoundException) {
            LOGGER.error("Destination not found: {}. Shutting down scheduler prefetcher...", (Object)this.destination.getName());
            this.stop();
            this.raiseErrorOnSubscribers((ResourceNotFoundException)e);
        } else {
            LOGGER.error("Can not retrieve messages: {}.", (Object)MessageUtils.getCompleteMessage(e), (Object)e);
        }
    }

    private void handleRetrieveSuccess(List<AnypointMQMessage> messages) {
        LOGGER.debug("Received '{}' messages: ", (Object)messages.size());
        switch (this.circuitBreaker.getState()) {
            case CLOSED: {
                this.dispatchAll(messages);
                break;
            }
            case OPEN: {
                this.nackAll(messages);
                this.circuitBreaker.releaseCircuitLock();
                this.inFlightRequests.decrementAndGet();
                break;
            }
            case HALF_OPEN: {
                this.tryCircuitTest(messages, (Subscriber<? super AnypointMQMessage>)((Subscriber)this.waitingSubscribers.poll()));
                this.inFlightRequests.decrementAndGet();
                break;
            }
            default: {
                LOGGER.error("Invalid circuit breaker value: {}", (Object)this.circuitBreaker.getState());
                this.inFlightRequests.decrementAndGet();
            }
        }
    }

    private void tryCircuitTest(List<AnypointMQMessage> messages, Subscriber<? super AnypointMQMessage> subscriber) {
        if (subscriber == null) {
            messages.forEach(this::addToBuffer);
            return;
        }
        ImmutableList remainingMessages = messages;
        if (!this.circuitBreaker.acquireCircuitLock()) {
            this.waitingSubscribers.offer(subscriber);
        } else {
            LOGGER.debug("Circuit HALF_OPEN, dispatching single message for testing");
            Iterator<AnypointMQMessage> messageIterator = messages.iterator();
            this.dispatchMessageToSubscriber(subscriber, messageIterator.next());
            remainingMessages = ImmutableList.copyOf(messageIterator);
        }
        this.waitCircuitTest();
        if (this.circuitBreaker.isClosed()) {
            LOGGER.debug("Circuit recovered, submitting next poll");
            this.dispatchAll((List<AnypointMQMessage>)remainingMessages);
        } else {
            LOGGER.debug("Circuit testing failed, returning remaining messages to the Queue and waiting for next poll");
            this.nackAll((List<AnypointMQMessage>)remainingMessages);
        }
    }

    private void waitCircuitTest() {
        block2: {
            try {
                LOGGER.debug("Circuit testing in progress, waiting for circuit test to complete");
                this.circuitBreaker.awaitCircuitLock(this.circuitTtl);
            }
            catch (InterruptedException e) {
                if (!this.running.get()) break block2;
                LOGGER.debug("Thread interrupted while waiting for the circuit test lock");
            }
        }
    }

    private void dispatchAll(List<AnypointMQMessage> messages) {
        for (AnypointMQMessage message : messages) {
            Subscriber subscriber;
            if (this.circuitBreaker.isClosed() && (subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
                this.dispatchMessageToSubscriber((Subscriber<? super AnypointMQMessage>)subscriber, message);
                continue;
            }
            this.addToBuffer(message);
        }
        this.inFlightRequests.decrementAndGet();
        this.submitRetrieve();
    }

    private void addToBuffer(AnypointMQMessage message) {
        this.preserver.add(message, this.lockTimeToLive);
        this.bufferedQueue.add(message);
    }

    private void submitRetrieve() {
        if (!this.requestedToStop()) {
            this.retriever.submit(this::retrieveMessages);
        }
    }

    private boolean requestedToStop() {
        return !this.running.get() || this.retriever.isShutdown();
    }

    private void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        Subscriber subscriber;
        this.invalidPrefetcherException = resourceNotFoundException;
        while ((subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
            subscriber.onError((Throwable)resourceNotFoundException);
        }
    }

    @Override
    public Observable<AnypointMQMessage> get() {
        return Observable.create(subscriber -> {
            if (this.invalidPrefetcherException != null) {
                subscriber.onError((Throwable)this.invalidPrefetcherException);
                return;
            }
            AnypointMQMessage message = this.bufferedQueue.take();
            while (message != null && this.isExpired(message)) {
                LOGGER.debug("Discarding expired message with ID '{}'", (Object)message.getMessageId());
                this.preserver.remove(message.getMessageId());
                message = this.bufferedQueue.take();
            }
            if (message != null) {
                this.handleMessageFromBuffer((Subscriber<? super AnypointMQMessage>)subscriber, message);
                return;
            }
            LOGGER.debug("No message available in the buffer");
            while (message == null && this.inFlightRequests.get() > 0 && this.running.get()) {
                message = this.bufferedQueue.poll(20000L, TimeUnit.MILLISECONDS);
            }
            if (message == null) {
                message = this.bufferedQueue.take();
            }
            if (message == null) {
                this.waitingSubscribers.offer((Subscriber<? super AnypointMQMessage>)subscriber);
                this.submitRetrieve();
            } else {
                this.handleMessageFromBuffer((Subscriber<? super AnypointMQMessage>)subscriber, message);
            }
        });
    }

    private void handleMessageFromBuffer(Subscriber<? super AnypointMQMessage> subscriber, AnypointMQMessage message) {
        switch (this.circuitBreaker.getState()) {
            case CLOSED: {
                this.dispatchMessageToSubscriber(subscriber, message);
                return;
            }
            case OPEN: {
                this.nackAll(Collections.singletonList(message));
                this.waitingSubscribers.offer(subscriber);
                return;
            }
            case HALF_OPEN: {
                this.tryCircuitTest(Collections.singletonList(message), subscriber);
                return;
            }
        }
        LOGGER.error("Invalid circuit breaker value: {}", (Object)this.circuitBreaker.getState());
    }

    private void nackAll(List<AnypointMQMessage> messages) {
        AnypointMQMessage message;
        LOGGER.debug("Returning all messages to the Queue");
        ImmutableList.Builder removedMessages = ImmutableList.builder();
        removedMessages.addAll(messages);
        while ((message = this.bufferedQueue.take()) != null) {
            this.preserver.remove(message.getMessageId());
            removedMessages.add((Object)message);
        }
        messages.stream().map(AnypointMQMessage::getMessageId).forEach(this.preserver::remove);
        Map<Boolean, List<AnypointMQMessage>> nackToFallbackDestination = removedMessages.build().stream().collect(Collectors.partitioningBy(AnypointMQMessage::getPublishedToFallback));
        List<Lock> messagesToNackPrimary = nackToFallbackDestination.get(false).stream().map(Lock::new).collect(Collectors.toList());
        List<Lock> messagesToNackFallback = nackToFallbackDestination.get(true).stream().map(Lock::new).collect(Collectors.toList());
        if (!messagesToNackPrimary.isEmpty()) {
            this.destination.nack(messagesToNackPrimary, false).fireAndForget();
        }
        if (!messagesToNackFallback.isEmpty()) {
            this.destination.nack(messagesToNackFallback, true).fireAndForget();
        }
    }

    private boolean isExpired(AnypointMQMessage message) {
        return this.preserver.isExpired(message.getMessageId());
    }

    private void dispatchMessageToSubscriber(Subscriber<? super AnypointMQMessage> subscriber, AnypointMQMessage message) {
        try {
            LOGGER.debug("Dispatch to subscriber message with ID '{}'", (Object)message.getMessageId());
            subscriber.onStart();
            subscriber.onNext((Object)message);
            this.preserver.remove(message.getMessageId());
        }
        finally {
            subscriber.onCompleted();
        }
    }

    private int getMaxConcurrentRequests(int maxLocalMessages) {
        return maxLocalMessages > 10 ? Math.min(maxLocalMessages / 10, 3) : 1;
    }

    private static final class NoOpMessagePreserver
    implements MessagePreserver {
        private NoOpMessagePreserver() {
        }

        @Override
        public void add(AnypointMQMessage message, long ttl) {
        }

        @Override
        public void add(List<AnypointMQMessage> messages, long ttl) {
        }

        @Override
        public boolean remove(String messageId) {
            return false;
        }

        @Override
        public boolean isExpired(String messageId) {
            return false;
        }

        @Override
        public void stop() {
        }
    }
}

