/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.api.exception.RestException;
import com.mulesoft.mq.restclient.internal.AbstractCourierRestClient;
import com.mulesoft.mq.restclient.internal.BufferedQueue;
import com.mulesoft.mq.restclient.internal.ExecutorUtils;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.MessageUtils;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.internal.SimpleBufferedQueue;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class ScheduledPrefetcher
implements Prefetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private static final int CONCURRENT_BATCHES = 3;
    private static final int SINGLE_MESSAGE = 1;
    private final BlockingQueue<Subscriber<? super AnypointMqMessage>> waitingSubscribers = new LinkedBlockingQueue<Subscriber<? super AnypointMqMessage>>();
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long lockTimeToLive;
    private final long pollingTime;
    private final int bufferLimit;
    private final MQCircuitBreaker circuitBreaker;
    private final AtomicInteger inFlightRequests = new AtomicInteger(0);
    private boolean running;
    private long retrievePeriod;
    private MessagePreserver preserver;
    private BufferedQueue bufferedQueue;
    private RestException invalidPrefetcherException;

    public ScheduledPrefetcher(Destination destination, int batchSize, long pollingTime, long lockTimeToLive, long retrievePeriod, MessagePreserver preserver, MQCircuitBreaker circuitBreaker) {
        Preconditions.checkArgument((circuitBreaker != null ? 1 : 0) != 0, (Object)"A valid MQ CircuitBreaker implementation should be provided but 'null' was found");
        this.circuitBreaker = circuitBreaker;
        this.destination = destination;
        this.batchSize = batchSize;
        this.bufferLimit = 3 * batchSize;
        this.pollingTime = pollingTime;
        this.lockTimeToLive = lockTimeToLive <= 0L ? 120000L : lockTimeToLive;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = Executors.newSingleThreadScheduledExecutor();
        this.retrievePeriod = retrievePeriod;
        this.preserver = preserver == null ? new NoOpMessagePreserver() : preserver;
        this.running = false;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.retriever.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                ScheduledPrefetcher.this.retrieveMessages();
            }
        }, 0L, this.retrievePeriod, TimeUnit.MILLISECONDS);
    }

    private void retrieveMessages() {
        switch (this.circuitBreaker.getState()) {
            case OPEN: {
                LOGGER.debug("Circuit is OPEN, skipping message fetch");
                break;
            }
            case CLOSED: {
                this.doFetch(this.batchSize);
                break;
            }
            case HALF_OPEN: {
                if (this.inFlightRequests.get() > 0) {
                    LOGGER.debug("Already {} request in progress, skipping message fetch for test", (Object)this.inFlightRequests.get());
                    break;
                }
                this.doFetch(1);
            }
        }
    }

    private void doFetch(int fetchSize) {
        int bufferSize = this.bufferedQueue.size();
        if (bufferSize >= this.bufferLimit) {
            LOGGER.debug("Buffer is full, skipping message fetch");
            return;
        }
        if (this.inFlightRequests.get() >= 3) {
            LOGGER.debug("Already {} request in progress, skipping message fetch", (Object)3);
            return;
        }
        LOGGER.debug("Retrieving messages...");
        if (LOGGER.isTraceEnabled()) {
            LOGGER.debug("Buffer contains '{}' messages", (Object)bufferSize);
        }
        this.inFlightRequests.incrementAndGet();
        this.destination.receive(fetchSize, this.pollingTime, this.lockTimeToLive + this.retrievePeriod).subscribe(new CourierObserver<List<AnypointMqMessage>>(){

            @Override
            public void onSuccess(List<AnypointMqMessage> messages) {
                if (messages != null && messages.size() > 0) {
                    ScheduledPrefetcher.this.handleRetrieveSuccess(messages);
                } else {
                    LOGGER.debug("No messages received");
                    ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                }
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }

            @Override
            public void onError(Throwable e) {
                if (ScheduledPrefetcher.this.requestedToStop()) {
                    return;
                }
                ScheduledPrefetcher.this.handleRetrieveError(e);
                ScheduledPrefetcher.this.circuitBreaker.releaseCircuitLock();
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }
        });
    }

    private void handleRetrieveError(Throwable e) {
        if (AbstractCourierRestClient.isTimeout(e)) {
            LOGGER.debug("Timeout while retrieving messages.");
        } else if (e instanceof ResourceNotFoundException) {
            LOGGER.error("Destination not found: {}. Shutting down scheduler prefetcher...", (Object)this.destination.getName());
            this.stop();
            this.raiseErrorOnSubscribers((ResourceNotFoundException)e);
        } else {
            LOGGER.error("Can not retrieve messages: {}.", (Object)MessageUtils.getCompleteMessage(e));
            LOGGER.debug("Can not retrieve messages.", e);
        }
    }

    private void handleRetrieveSuccess(List<AnypointMqMessage> messages) {
        LOGGER.debug("Received '{}' messages: ", (Object)messages.size());
        switch (this.circuitBreaker.getState()) {
            case CLOSED: {
                this.dispatchAll(messages);
                break;
            }
            case OPEN: {
                this.nackAll(messages);
                this.circuitBreaker.releaseCircuitLock();
                break;
            }
            case HALF_OPEN: {
                this.tryCircuitTest(messages, (Subscriber)this.waitingSubscribers.poll());
            }
        }
    }

    private void tryCircuitTest(List<AnypointMqMessage> messages, Subscriber subscriber) {
        if (subscriber == null) {
            messages.forEach(this::addToBuffer);
            return;
        }
        if (this.circuitBreaker.acquireCircuitLock() && !messages.isEmpty()) {
            LOGGER.debug("Circuit HALF_OPEN, dispatching single message for testing");
            Iterator<AnypointMqMessage> messageIterator = messages.iterator();
            this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, messageIterator.next());
            this.waitCircuitTest();
            ImmutableList remainingMessages = ImmutableList.copyOf(messageIterator);
            if (this.circuitBreaker.isClosed()) {
                LOGGER.debug("Circuit recovered, submitting next poll");
                this.dispatchAll((List<AnypointMqMessage>)remainingMessages);
            } else {
                LOGGER.debug("Circuit testing failed, returning remaining messages to the Queue and waiting for next poll");
                this.nackAll((List<AnypointMqMessage>)remainingMessages);
            }
        }
    }

    private void waitCircuitTest() {
        block2: {
            try {
                LOGGER.debug("Circuit testing in progress, waiting for circuit test to complete");
                this.circuitBreaker.awaitCircuitLock();
            }
            catch (InterruptedException e) {
                if (!this.running) break block2;
                LOGGER.debug("Thread interrupted while waiting for the circuit test lock");
            }
        }
    }

    private void dispatchAll(List<AnypointMqMessage> messages) {
        for (AnypointMqMessage message : messages) {
            Subscriber subscriber;
            if (this.circuitBreaker.isClosed() && (subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
                this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, message);
                continue;
            }
            this.addToBuffer(message);
        }
        this.submitRetrieve();
    }

    private void addToBuffer(AnypointMqMessage message) {
        this.preserver.add(message, this.lockTimeToLive);
        this.bufferedQueue.add(message);
    }

    private void submitRetrieve() {
        if (!this.requestedToStop()) {
            this.retriever.submit(this::retrieveMessages);
        }
    }

    private boolean requestedToStop() {
        return !this.running || this.retriever.isShutdown();
    }

    private void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        Subscriber subscriber;
        this.invalidPrefetcherException = resourceNotFoundException;
        while ((subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
            subscriber.onError((Throwable)resourceNotFoundException);
        }
    }

    @Override
    public Observable<AnypointMqMessage> get() {
        return Observable.create(subscriber -> {
            if (this.invalidPrefetcherException != null) {
                subscriber.onError((Throwable)this.invalidPrefetcherException);
                return;
            }
            AnypointMqMessage message = this.bufferedQueue.take();
            while (message != null && this.isExpired(message)) {
                LOGGER.debug("Discarding expired message - {}", (Object)message.getId());
                this.preserver.remove(message.getId());
                message = this.bufferedQueue.take();
            }
            if (message == null) {
                this.waitingSubscribers.offer((Subscriber<? super AnypointMqMessage>)subscriber);
                this.submitRetrieve();
                return;
            }
            this.handleMessageFromBuffer((Subscriber<? super AnypointMqMessage>)subscriber, message);
        });
    }

    private void handleMessageFromBuffer(Subscriber<? super AnypointMqMessage> subscriber, AnypointMqMessage message) {
        switch (this.circuitBreaker.getState()) {
            case CLOSED: {
                this.dispatchMessageToSubscriber(subscriber, message);
                return;
            }
            case OPEN: {
                this.nackAll(Collections.singletonList(message));
                this.waitingSubscribers.offer(subscriber);
                return;
            }
            case HALF_OPEN: {
                this.tryCircuitTest(Collections.singletonList(message), subscriber);
            }
        }
    }

    private void nackAll(List<AnypointMqMessage> messages) {
        AnypointMqMessage message;
        LOGGER.debug("Returning all messages to the Queue");
        ImmutableList.Builder removedMessages = ImmutableList.builder();
        removedMessages.addAll(messages);
        while ((message = this.bufferedQueue.take()) != null) {
            this.preserver.remove(message.getId());
            removedMessages.add((Object)message);
        }
        messages.stream().map(AnypointMqMessage::getId).forEach(this.preserver::remove);
        ImmutableList messagesToNack = removedMessages.build();
        if (!messagesToNack.isEmpty()) {
            this.destination.nack((List<AnypointMqMessage>)messagesToNack).fireAndForget();
        }
    }

    private boolean isExpired(AnypointMqMessage message) {
        return this.preserver.isExpired(message.getId());
    }

    private void dispatchMessageToSubscriber(Subscriber<? super AnypointMqMessage> subscriber, AnypointMqMessage message) {
        try {
            LOGGER.debug("Dispatch to subscriber - {}", (Object)message.getId());
            subscriber.onStart();
            subscriber.onNext((Object)message);
            this.preserver.remove(message.getId());
        }
        finally {
            subscriber.onCompleted();
        }
    }

    public void stop() {
        AnypointMqMessage message;
        this.running = false;
        while ((message = this.bufferedQueue.take()) != null) {
            this.preserver.remove(message.getId());
        }
        this.bufferedQueue.clear();
        ExecutorUtils.stopExecutorService(this.retriever);
    }

    private static final class NoOpMessagePreserver
    implements MessagePreserver {
        private NoOpMessagePreserver() {
        }

        @Override
        public void add(AnypointMqMessage message, long ttl) {
        }

        @Override
        public void add(List<AnypointMqMessage> messages, long ttl) {
        }

        @Override
        public boolean remove(String messageId) {
            return false;
        }

        @Override
        public boolean isExpired(String messageId) {
            return false;
        }
    }
}

