/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal;

import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.exception.ResourceNotFoundException;
import com.mulesoft.mq.restclient.api.exception.RestException;
import com.mulesoft.mq.restclient.internal.AbstractCourierRestClient;
import com.mulesoft.mq.restclient.internal.BufferedQueue;
import com.mulesoft.mq.restclient.internal.ExecutorUtils;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.MessageUtils;
import com.mulesoft.mq.restclient.internal.Prefetcher;
import com.mulesoft.mq.restclient.internal.SimpleBufferedQueue;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

public class ScheduledPrefetcher
implements Prefetcher {
    private static final Logger logger = LoggerFactory.getLogger(ScheduledPrefetcher.class);
    private static final int DEFAULT_RETRIEVE_PERIOD = 5000;
    private final BlockingQueue<Subscriber<? super AnypointMqMessage>> waitingSubscribers = new LinkedBlockingQueue<Subscriber<? super AnypointMqMessage>>();
    private final ScheduledExecutorService retriever;
    private final Destination destination;
    private final int batchSize;
    private final long poolingTime;
    private final long lockTimeToLive;
    private BufferedQueue bufferedQueue;
    private MessagePreserver preserver;
    private long retrievePeriod;
    private RestException invalidPrefetcherException;

    public ScheduledPrefetcher(Destination destination, int batchSize, long poolingTime, long lockTimeToLive) {
        this(destination, batchSize, poolingTime, lockTimeToLive, 5000L, null);
    }

    public ScheduledPrefetcher(Destination destination, int batchSize, long poolingTime, long lockTimeToLive, long retrievePeriod, MessagePreserver preserver) {
        this.destination = destination;
        this.batchSize = batchSize;
        this.poolingTime = poolingTime;
        this.lockTimeToLive = lockTimeToLive;
        this.bufferedQueue = new SimpleBufferedQueue();
        this.retriever = this.createExecutorService();
        this.retrievePeriod = retrievePeriod;
        this.preserver = preserver;
        this.retriever.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                ScheduledPrefetcher.this.retrieveMessages();
            }
        }, 0L, retrievePeriod, TimeUnit.MILLISECONDS);
    }

    protected ScheduledExecutorService createExecutorService() {
        return Executors.newSingleThreadScheduledExecutor();
    }

    private void retrieveMessages() {
        if (this.bufferedQueue.size() < 3 * this.batchSize) {
            logger.debug("Retrieving messages...");
            this.destination.receive(this.batchSize, this.poolingTime, this.lockTimeToLive + this.retrievePeriod).subscribe(new CourierObserver<List<AnypointMqMessage>>(){

                @Override
                public void onSuccess(List<AnypointMqMessage> messages) {
                    if (messages != null && messages.size() > 0) {
                        Subscriber subscriber = null;
                        for (AnypointMqMessage message : messages) {
                            subscriber = (Subscriber)ScheduledPrefetcher.this.waitingSubscribers.poll();
                            if (subscriber != null) {
                                ScheduledPrefetcher.this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, message);
                                continue;
                            }
                            ScheduledPrefetcher.this.bufferedQueue.add(message);
                            if (ScheduledPrefetcher.this.preserver == null) continue;
                            ScheduledPrefetcher.this.preserver.add(message, ScheduledPrefetcher.this.lockTimeToLive);
                        }
                        if (subscriber != null) {
                            ScheduledPrefetcher.this.retrieveMessages();
                        }
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (AbstractCourierRestClient.isTimeout(e)) {
                        logger.debug("Timeout while retrieving messages.");
                    } else if (e instanceof ResourceNotFoundException) {
                        logger.error("Destination not found: {}. Shutting down scheduler prefetcher...", (Object)ScheduledPrefetcher.this.destination.getName());
                        ScheduledPrefetcher.this.stop();
                        ScheduledPrefetcher.this.raiseErrorOnSubscribers((ResourceNotFoundException)e);
                    } else {
                        logger.error("Can not retrieve messages: {}.", (Object)MessageUtils.getCompleteMessage(e));
                        logger.debug("Can not retrieve messages.", e);
                    }
                }
            });
        }
    }

    private void raiseErrorOnSubscribers(ResourceNotFoundException resourceNotFoundException) {
        this.invalidPrefetcherException = resourceNotFoundException;
        Subscriber subscriber = null;
        while ((subscriber = (Subscriber)this.waitingSubscribers.poll()) != null) {
            subscriber.onError((Throwable)resourceNotFoundException);
        }
    }

    @Override
    public Observable<AnypointMqMessage> get() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<AnypointMqMessage>(){

            public void call(Subscriber<? super AnypointMqMessage> subscriber) {
                if (ScheduledPrefetcher.this.invalidPrefetcherException != null) {
                    subscriber.onError((Throwable)ScheduledPrefetcher.this.invalidPrefetcherException);
                } else {
                    AnypointMqMessage message;
                    while ((message = ScheduledPrefetcher.this.bufferedQueue.take()) != null && ScheduledPrefetcher.this.isExpired(message)) {
                    }
                    if (message != null) {
                        ScheduledPrefetcher.this.dispatchMessageToSubscriber((Subscriber<? super AnypointMqMessage>)subscriber, message);
                    } else {
                        ScheduledPrefetcher.this.waitingSubscribers.offer(subscriber);
                        ScheduledPrefetcher.this.retrieveMessages();
                    }
                }
            }
        });
    }

    private void dispatchMessageToSubscriber(Subscriber<? super AnypointMqMessage> subscriber, AnypointMqMessage message) {
        try {
            subscriber.onStart();
            subscriber.onNext((Object)message);
            if (this.preserver != null) {
                this.preserver.remove(message.getId());
            }
        }
        finally {
            subscriber.onCompleted();
        }
    }

    private boolean isExpired(AnypointMqMessage message) {
        return false;
    }

    public void stop() {
        if (this.preserver != null) {
            AnypointMqMessage message;
            while ((message = this.bufferedQueue.take()) != null) {
                this.preserver.remove(message.getId());
            }
        }
        this.bufferedQueue.clear();
        ExecutorUtils.stopExecutorService(this.retriever);
    }
}

