/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.extension.mq.internal;

import com.mulesoft.extension.mq.api.message.MessageContextFactory;
import com.mulesoft.extension.mq.internal.AbstractSubscriber;
import com.mulesoft.extension.mq.internal.config.SubscriberConfiguration;
import com.mulesoft.extension.mq.internal.domain.MessageListener;
import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.circuit.MQCircuitBreaker;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollingSubscriber
extends AbstractSubscriber {
    public static final int MAX_POLLING_TIMEOUT = 20000;
    private static final Logger LOGGER = LoggerFactory.getLogger(PollingSubscriber.class);
    private static final int BATCH_SIZE = 10;
    private static final int SINGLE_MESSAGE = 1;
    private final AtomicBoolean isRequestInFlight = new AtomicBoolean(false);
    private final long pollingTime;

    public PollingSubscriber(SubscriberConfiguration subscriberConfiguration, Destination destination, MessageListener messageListener, MessageContextFactory messageContextFactory, MQCircuitBreaker circuitBreaker) {
        super(subscriberConfiguration, destination, messageListener, messageContextFactory, circuitBreaker);
        this.pollingTime = subscriberConfiguration.getPollingTime() == null || subscriberConfiguration.getPollingTime() <= 0L ? 1000L : subscriberConfiguration.getPollingTime();
    }

    @Override
    protected void doSubmitWork() {
        this.executorService.scheduleAtFixedRate(() -> {
            try {
                this.subscribeForMessages();
            }
            catch (Throwable t) {
                LOGGER.error("Can not subscribe for messages.", t);
                throw t;
            }
        }, 0L, this.pollingTime, TimeUnit.MILLISECONDS);
    }

    private void subscribeForMessages() {
        if (this.shouldSkipMessageFetch()) {
            return;
        }
        if (this.circuitBreaker.isClosed()) {
            this.doFetch(10);
        } else {
            LOGGER.debug("Circuit HALF_OPEN, submitting request for circuit testing");
            this.doFetch(1);
        }
    }

    private boolean shouldSkipMessageFetch() {
        if (!this.running) {
            LOGGER.debug("Skipping message fetch, already requested to Stop");
            return true;
        }
        if (this.circuitBreaker.isOpen()) {
            LOGGER.debug("Circuit is OPEN, skipping message fetch");
            return true;
        }
        if (!this.isRequestInFlight.compareAndSet(false, true)) {
            LOGGER.debug("Skipping message fetch, already one request in flight for this subscriber");
            return true;
        }
        return false;
    }

    private void doFetch(int fetchSize) {
        LOGGER.debug("Do fetch of '{}' messages", (Object)fetchSize);
        this.destination.receive(fetchSize, 20000L, Optional.ofNullable(this.subscriberConfiguration.getAcknowledgementTimeout()).orElse(120000L).longValue()).subscribe((CourierObserver)new CourierObserver<List<AnypointMqMessage>>(){

            public void onSuccess(List<AnypointMqMessage> messages) {
                PollingSubscriber.this.handleSuccess(messages);
                PollingSubscriber.this.isRequestInFlight.set(false);
            }

            public void onError(Throwable e) {
                PollingSubscriber.this.handleError(e);
                if (PollingSubscriber.this.circuitBreaker.isHalfOpen()) {
                    LOGGER.debug("An error occurred while obtaining a message to test the circuit, releasing the lock");
                    PollingSubscriber.this.circuitBreaker.releaseCircuitLock();
                }
                PollingSubscriber.this.isRequestInFlight.set(false);
            }
        });
    }

    private void handleSuccess(List<AnypointMqMessage> messages) {
        if (messages == null || messages.isEmpty()) {
            LOGGER.debug("No messages received");
            this.circuitBreaker.releaseCircuitLock();
        } else {
            switch (this.circuitBreaker.getState()) {
                case OPEN: {
                    LOGGER.debug("Circuit is OPEN, returning all the messages to the Queue");
                    this.nackAll(messages);
                    this.circuitBreaker.releaseCircuitLock();
                    break;
                }
                case CLOSED: {
                    this.processMessages(messages);
                    break;
                }
                case HALF_OPEN: {
                    this.tryCircuitTest(messages);
                }
            }
        }
    }

    private void tryCircuitTest(List<AnypointMqMessage> messages) {
        if (this.circuitBreaker.acquireCircuitLock() && !messages.isEmpty()) {
            LOGGER.debug("Circuit HALF_OPEN, dispatching single message for testing");
            Iterator<AnypointMqMessage> messageIterator = messages.iterator();
            this.processMessage(messageIterator.next());
            this.waitCircuitTest();
            if (this.circuitBreaker.isClosed()) {
                this.processMessages(messageIterator);
                LOGGER.debug("Circuit recovered, submitting next poll");
                this.executorService.submit(() -> this.subscribeForMessages());
            } else {
                LOGGER.debug("Circuit testing failed, returning remaining messages to the Queue and waiting for next poll");
                this.nackAll(messageIterator);
            }
        } else {
            LOGGER.debug("Test in progress, skipping poll");
        }
    }

    private void waitCircuitTest() {
        block2: {
            try {
                LOGGER.debug("Circuit testing in progress, waiting for circuit test to complete");
                this.circuitBreaker.awaitCircuitLock();
            }
            catch (InterruptedException e) {
                if (!this.running) break block2;
                LOGGER.debug("Thread interrupted while waiting for the circuit test lock");
            }
        }
    }
}

