/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mulesoft.extension.mq.api.modes.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.AbstractSubscriber;
import com.mulesoft.extension.mq.internal.server.MessageListener;
import com.mulesoft.mq.restclient.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.client.mq.AbstractCourierRestClient;
import com.mulesoft.mq.restclient.client.mq.domain.AnypointMQMessage;
import com.mulesoft.mq.restclient.internal.CourierObserver;
import com.mulesoft.mq.restclient.internal.Destination;
import com.mulesoft.mq.restclient.internal.impl.ScheduledPrefetcher;
import com.mulesoft.mq.restclient.utils.ExecutorUtils;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrefetchSubscriber
extends AbstractSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(PrefetchSubscriber.class);
    private final ScheduledPrefetcher prefetcher;
    private final ExecutorService executor;

    public PrefetchSubscriber(SubscriberAckMode acknowledgementMode, long acknowledgementTimeout, Destination destination, MessageListener messageListener, MQCircuitBreaker circuitBreaker, ScheduledPrefetcher prefetcher) {
        super(acknowledgementMode, acknowledgementTimeout, destination, messageListener, circuitBreaker);
        this.prefetcher = prefetcher;
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("prefetch-subscriber-%d").build());
    }

    @Override
    public void start() {
        super.start();
        this.prefetcher.start();
    }

    @Override
    public void stop() {
        super.stop();
        this.prefetcher.stop();
        ExecutorUtils.stopExecutorService((ExecutorService)this.executor);
    }

    @Override
    protected void doSubmitWork() {
        this.executor.submit(() -> {
            try {
                this.subscribeForMessages();
            }
            catch (Throwable t) {
                LOGGER.error("Can not subscribe for messages: " + t.getMessage(), t);
                throw t;
            }
        });
    }

    private void subscribeForMessages() {
        this.destination.receive(1, 0L, 120000L, false, false, AbstractCourierRestClient.MAX_RETRIES).subscribe((CourierObserver)new CourierObserver<List<AnypointMQMessage>>(){

            public void onSuccess(List<AnypointMQMessage> messages) {
                try {
                    if (messages != null && !messages.isEmpty()) {
                        PrefetchSubscriber.this.processMessages(messages);
                    }
                }
                finally {
                    PrefetchSubscriber.this.submitWork();
                }
            }

            public void onError(Throwable e) {
                try {
                    PrefetchSubscriber.this.handleError(e);
                }
                finally {
                    PrefetchSubscriber.this.submitWork();
                }
            }
        });
    }
}

