/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal;

import com.mulesoft.extension.mq.api.message.AnypointMQMessageContext;
import com.mulesoft.extension.mq.api.message.MessageContextFactory;
import com.mulesoft.extension.mq.internal.config.SubscriberAckMode;
import com.mulesoft.extension.mq.internal.config.SubscriberConfiguration;
import com.mulesoft.extension.mq.internal.domain.MessageListener;
import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.internal.AbstractCourierRestClient;
import com.mulesoft.mq.restclient.internal.ExecutorUtils;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Subscriber {
    private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
    private final Destination destination;
    private final MessageListener messageListener;
    private final MessageContextFactory messageContextFactory;
    private final SubscriberConfiguration subscriberConfiguration;
    private final ExecutorService executorService;
    private boolean running;
    private final int batchSize;

    public Subscriber(SubscriberConfiguration subscriberConfiguration, Destination destination, MessageListener messageListener, MessageContextFactory messageContextFactory, int pollingThreads, int batchSize) {
        this.subscriberConfiguration = subscriberConfiguration;
        this.destination = destination;
        this.messageListener = messageListener;
        this.messageContextFactory = messageContextFactory;
        this.batchSize = batchSize;
        if (pollingThreads < 1 || pollingThreads > 5) {
            throw new RuntimeException("Polling threads must be between 1 to 5. Received: " + pollingThreads);
        }
        this.executorService = Executors.newFixedThreadPool(pollingThreads);
    }

    public synchronized void start() {
        this.running = true;
        this.submitWork();
    }

    private void submitWork() {
        if (this.running) {
            this.executorService.submit(() -> {
                try {
                    this.subscribeForMessages();
                }
                catch (Throwable t) {
                    logger.error("Can not subscribe for messages.", t);
                    throw t;
                }
            });
        }
    }

    private void subscribeForMessages() {
        this.destination.receive(this.batchSize, 0L, Optional.ofNullable(this.subscriberConfiguration.getAcknowledgementTimeout()).orElse(120000L).longValue()).subscribe((CourierObserver)new CourierObserver<List<AnypointMqMessage>>(){

            public void onSuccess(List<AnypointMqMessage> messages) {
                Subscriber.this.submitWork();
                if (Subscriber.this.running) {
                    for (AnypointMqMessage message : messages) {
                        AnypointMQMessageContext messageContext = Subscriber.this.messageContextFactory.createMessageContext(message, Subscriber.this.destination.getName());
                        if (Subscriber.this.subscriberConfiguration.getAcknowledgementMode() == SubscriberAckMode.IMMEDIATE) {
                            Subscriber.this.destination.ack(messageContext.getMessage()).fireAndForget();
                        }
                        Subscriber.this.messageListener.onReceive(messageContext);
                    }
                }
            }

            public void onError(Throwable e) {
                Subscriber.this.submitWork();
                if (!AbstractCourierRestClient.isTimeout((Throwable)e)) {
                    Subscriber.this.messageListener.onError(e);
                }
            }
        });
    }

    public synchronized void stop() {
        this.running = false;
        ExecutorUtils.stopExecutorService((ExecutorService)this.executorService);
    }
}

