/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal;

import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.api.exception.RestException;
import com.mulesoft.mq.restclient.internal.AbstractCourierRestClient;
import com.mulesoft.mq.restclient.internal.BufferedQueue;
import com.mulesoft.mq.restclient.internal.ExecutorUtils;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.MessageUtils;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.internal.SimpleBufferedQueue;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class ScheduledPrefetcher
implements Prefetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private static final int DEFAULT_RETRIEVE_PERIOD = 5000;
    private static final int CONCURRENT_BATCHES = 3;
    private final BlockingQueue<Subscriber<? super AnypointMqMessage>> waitingSubscribers = new LinkedBlockingQueue<Subscriber<? super AnypointMqMessage>>();
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long lockTimeToLive;
    private final long poolingTime;
    private final int bufferLimit;
    private boolean running;
    private AtomicInteger inFlightRequests = new AtomicInteger(0);
    private BufferedQueue bufferedQueue;
    private MessagePreserver preserver;
    private long retrievePeriod;
    private RestException invalidPrefetcherException;

    public ScheduledPrefetcher(Destination destination, int batchSize, long poolingTime, long lockTimeToLive) {
        this(destination, batchSize, poolingTime, lockTimeToLive, 5000L, new NoOpMessagePreserver());
    }

    public ScheduledPrefetcher(Destination destination, int batchSize, long poolingTime, long lockTimeToLive, long retrievePeriod, MessagePreserver preserver) {
        this.destination = destination;
        this.batchSize = batchSize;
        this.bufferLimit = 3 * batchSize;
        this.poolingTime = poolingTime;
        this.lockTimeToLive = lockTimeToLive <= 0L ? 120000L : lockTimeToLive;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = this.createExecutorService();
        this.retrievePeriod = retrievePeriod;
        this.preserver = preserver == null ? new NoOpMessagePreserver() : preserver;
        this.running = false;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.retriever.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                ScheduledPrefetcher.this.retrieveMessages();
            }
        }, 0L, this.retrievePeriod, TimeUnit.MILLISECONDS);
    }

    protected ScheduledExecutorService createExecutorService() {
        return Executors.newSingleThreadScheduledExecutor();
    }

    private void retrieveMessages() {
        int bufferSize = this.bufferedQueue.size();
        if (bufferSize >= this.bufferLimit) {
            LOGGER.debug("Buffer is full, skipping message fetch");
            return;
        }
        if (this.inFlightRequests.get() >= 3) {
            LOGGER.debug("Already {} request in progress, skipping message fetch", (Object)3);
            return;
        }
        LOGGER.debug("Retrieving messages...");
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Buffer contains '{}' messages", (Object)bufferSize);
        }
        this.inFlightRequests.incrementAndGet();
        this.destination.receive(this.batchSize, this.poolingTime, this.lockTimeToLive + this.retrievePeriod).subscribe(new CourierObserver<List<AnypointMqMessage>>(){

            @Override
            public void onSuccess(List<AnypointMqMessage> messages) {
                if (messages != null && messages.size() > 0) {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Received '{}' messages: ", (Object)messages.size());
                    }
                    for (AnypointMqMessage message : messages) {
                        Subscriber subscriber = (Subscriber)ScheduledPrefetcher.this.waitingSubscribers.poll();
                        if (subscriber != null) {
                            ScheduledPrefetcher.this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, message);
                            continue;
                        }
                        ScheduledPrefetcher.this.preserver.add(message, ScheduledPrefetcher.this.lockTimeToLive);
                        ScheduledPrefetcher.this.bufferedQueue.add(message);
                    }
                    ScheduledPrefetcher.this.submitRetrieve();
                }
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }

            @Override
            public void onError(Throwable e) {
                if (ScheduledPrefetcher.this.requestedToStop()) {
                    return;
                }
                if (AbstractCourierRestClient.isTimeout(e)) {
                    LOGGER.debug("Timeout while retrieving messages.");
                } else if (e instanceof ResourceNotFoundException) {
                    LOGGER.error("Destination not found: {}. Shutting down scheduler prefetcher...", (Object)ScheduledPrefetcher.this.destination.getName());
                    ScheduledPrefetcher.this.stop();
                    ScheduledPrefetcher.this.raiseErrorOnSubscribers((ResourceNotFoundException)e);
                } else {
                    LOGGER.error("Can not retrieve messages: {}.", (Object)MessageUtils.getCompleteMessage(e));
                    LOGGER.debug("Can not retrieve messages.", e);
                }
                ScheduledPrefetcher.this.inFlightRequests.decrementAndGet();
            }
        });
    }

    private void submitRetrieve() {
        if (!this.requestedToStop()) {
            this.retriever.submit(this::retrieveMessages);
        }
    }

    private boolean requestedToStop() {
        return !this.running || this.retriever.isShutdown();
    }

    private void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        this.invalidPrefetcherException = resourceNotFoundException;
        Subscriber subscriber = null;
        while ((subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
            subscriber.onError((Throwable)resourceNotFoundException);
        }
    }

    @Override
    public Observable<AnypointMqMessage> get() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<AnypointMqMessage>(){

            public void call(Subscriber<? super AnypointMqMessage> subscriber) {
                if (ScheduledPrefetcher.this.invalidPrefetcherException != null) {
                    subscriber.onError((Throwable)ScheduledPrefetcher.this.invalidPrefetcherException);
                    return;
                }
                AnypointMqMessage message = ScheduledPrefetcher.this.bufferedQueue.take();
                while (message != null && ScheduledPrefetcher.this.isExpired(message)) {
                    LOGGER.trace("Discarding expired message - {}", (Object)message.getId());
                    ScheduledPrefetcher.this.preserver.remove(message.getId());
                    message = ScheduledPrefetcher.this.bufferedQueue.take();
                }
                if (message != null) {
                    ScheduledPrefetcher.this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, message);
                } else {
                    ScheduledPrefetcher.this.waitingSubscribers.offer(subscriber);
                    ScheduledPrefetcher.this.submitRetrieve();
                }
            }
        });
    }

    private boolean isExpired(AnypointMqMessage message) {
        return this.preserver.isExpired(message.getId());
    }

    private void dispatchMessageToSubscriber(Subscriber<? super AnypointMqMessage> subscriber, AnypointMqMessage message) {
        try {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Dispatch to subscriber - {}", (Object)message.getId());
            }
            subscriber.onStart();
            subscriber.onNext((Object)message);
            this.preserver.remove(message.getId());
        }
        finally {
            subscriber.onCompleted();
        }
    }

    public void stop() {
        AnypointMqMessage message;
        this.running = false;
        while ((message = this.bufferedQueue.take()) != null) {
            this.preserver.remove(message.getId());
        }
        this.bufferedQueue.clear();
        ExecutorUtils.stopExecutorService(this.retriever);
    }

    private static final class NoOpMessagePreserver
    implements MessagePreserver {
        private NoOpMessagePreserver() {
        }

        @Override
        public void add(AnypointMqMessage message, long ttl) {
        }

        @Override
        public void add(List<AnypointMqMessage> messages, long ttl) {
        }

        @Override
        public boolean remove(String messageId) {
            return false;
        }

        @Override
        public boolean isExpired(String messageId) {
            return false;
        }
    }
}

