/*
 * Decompiled with CFR 0.152.
 */
package com.mulesoft.mq.restclient.internal;

import com.google.common.collect.ImmutableList;
import com.mulesoft.mq.restclient.api.AnypointMqMessage;
import com.mulesoft.mq.restclient.api.CourierObservable;
import com.mulesoft.mq.restclient.api.CourierObserver;
import com.mulesoft.mq.restclient.api.Destination;
import com.mulesoft.mq.restclient.api.circuit.MQCircuitBreaker;
import com.mulesoft.mq.restclient.internal.MessagePreserver;
import com.mulesoft.mq.restclient.internal.ScheduledPrefetcher;
import com.mulesoft.mq.restclient.internal.TestDestination;
import com.mulesoft.mq.restclient.internal.TestMessage;
import com.mulesoft.mq.restclient.internal.TestTimeSupplier;
import com.mulesoft.mq.restclient.internal.TestUtils;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.class)
public class ScheduledPrefetcherTestCase {
    private static final int POLLING_TIME = 250;
    private static final int BATCH_SIZE = 5;
    private static final long START_TIME = 0L;
    private static final int SCHEDULE_FIXED_FREQUENCY = 500;
    private static final String ID_ONE = "id-1";
    private static final String ID_TWO = "id-2";
    private static final String ONE = "1";
    private static final String TWO = "2";
    private static final String ID_THREE = "id-3";
    private static final String THREE = "3";
    @Mock
    private MessagePreserver preserverMock;
    @Mock
    private MQCircuitBreaker circuitBreakerMock;
    private TestDestination destination;
    private ScheduledPrefetcher prefetcher;
    private ScheduledExecutorService testCallbackExecutor;

    @Before
    public void setup() {
        this.testCallbackExecutor = Executors.newScheduledThreadPool(4);
        this.destination = (TestDestination)Mockito.spy((Object)new TestDestination(new TestTimeSupplier(0L)));
        Mockito.when((Object)this.preserverMock.isExpired(org.mockito.Matchers.anyString())).thenReturn((Object)false);
        Mockito.when((Object)this.preserverMock.remove(org.mockito.Matchers.anyString())).thenReturn((Object)false);
        this.setCircuitState(MQCircuitBreaker.CircuitState.CLOSED);
        this.prefetcher = new ScheduledPrefetcher((Destination)this.destination, 5, 250L, 120000L, 500L, this.preserverMock, this.circuitBreakerMock);
    }

    @After
    public void tearDown() {
        this.prefetcher.stop();
    }

    @Test
    public void pollSkippedWhenInFlightRequestsLimitReached() {
        this.setDestinationCallBackOnSubscribe(o -> {});
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)2500).atMost(3))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.atMost((int)3))).acquireCircuitLock();
    }

    @Test
    public void inFlightRequestsReleasedOnSuccess() {
        this.setDestinationCallBackOnSubscribe(o -> o.onSuccess((Object)ImmutableList.of((Object)this.createTestMessage(ONE))));
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.timeout((long)2500L).atLeast(4))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
    }

    @Test
    public void inFlightRequestsReleasedOnError() {
        this.setDestinationCallBackOnSubscribe(o -> o.onError((Throwable)new RuntimeException()));
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.timeout((long)2500L).atLeast(4))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
    }

    @Test
    public void retrieveMessagesOnStart() {
        this.destination.send(this.createTestMessage(ONE));
        MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)0));
        this.prefetcher.start();
        TestUtils.probe(600L, 200L, () -> MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)1)));
    }

    @Test
    public void retrieveMessagePeriodicallyOnEmptyQueue() throws Exception {
        MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)0));
        this.prefetcher.start();
        TestUtils.probe(500L, 200L, () -> MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)1)));
        Thread.sleep(250L);
        MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)1));
        TestUtils.probe(500L, 300L, () -> MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)2)));
        Thread.sleep(250L);
        MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)2));
    }

    @Test
    public void prefetcherActsLikeAQueue() {
        this.destination.send(this.createTestMessage(ONE));
        this.destination.send(this.createTestMessage(TWO));
        this.destination.send(this.createTestMessage(THREE));
        this.prefetcher.start();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)5000L).times(3))).add((AnypointMqMessage)org.mockito.Matchers.any(AnypointMqMessage.class), org.mockito.Matchers.anyLong());
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_ONE));
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_TWO));
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_THREE));
        MatcherAssert.assertThat((Object)this.destination.getReceiveCount(), (Matcher)Matchers.is((Object)1));
    }

    @Test
    public void checkMessageExpiredOnCall() {
        AnypointMqMessage message = this.createTestMessage(ONE);
        this.destination.send(message);
        this.prefetcher.start();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)500L))).add((AnypointMqMessage)org.mockito.Matchers.eq((Object)message), org.mockito.Matchers.anyLong());
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_ONE));
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock)).isExpired(ID_ONE);
    }

    @Test
    public void skipExpiredMessage() {
        Mockito.when((Object)this.preserverMock.isExpired((String)Mockito.eq((Object)ID_ONE))).thenReturn((Object)true);
        Mockito.when((Object)this.preserverMock.isExpired((String)Mockito.eq((Object)ID_TWO))).thenReturn((Object)false);
        AnypointMqMessage firstMessage = this.createTestMessage(ONE);
        AnypointMqMessage secondMessage = this.createTestMessage(TWO);
        this.destination.send(firstMessage);
        this.destination.send(secondMessage);
        this.prefetcher.start();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)5000L).times(2))).add((AnypointMqMessage)org.mockito.Matchers.any(AnypointMqMessage.class), org.mockito.Matchers.anyLong());
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_TWO));
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock)).isExpired(ID_ONE);
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock)).isExpired(ID_TWO);
    }

    @Test
    public void removeMessageOnCall() {
        AnypointMqMessage message = this.createTestMessage(ONE);
        this.destination.send(message);
        this.prefetcher.start();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)500L))).add((AnypointMqMessage)org.mockito.Matchers.eq((Object)message), org.mockito.Matchers.anyLong());
        MatcherAssert.assertThat((Object)((AnypointMqMessage)this.prefetcher.get().toBlocking().first()).getId(), (Matcher)Matchers.is((Object)ID_ONE));
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock)).remove(ID_ONE);
    }

    @Test
    public void addMessageToPreserverWhenNoConsumers() {
        AnypointMqMessage message = this.createTestMessage(ONE);
        this.destination.send(message);
        this.prefetcher.start();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)500L))).add((AnypointMqMessage)org.mockito.Matchers.eq((Object)message), org.mockito.Matchers.anyLong());
    }

    @Test
    public void retrieveSkippedWhenCircuitIsOpen() throws InterruptedException {
        this.setCircuitState(MQCircuitBreaker.CircuitState.OPEN);
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1500).never())).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).acquireCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).awaitCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).releaseCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.atMost((int)4))).isOpen();
    }

    @Test
    public void retrieveSkippedWhenTestAlreadyInFlight() throws InterruptedException {
        this.setCircuitState(MQCircuitBreaker.CircuitState.HALF_OPEN);
        this.setDestinationCallBackOnSubscribe(o -> {});
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1500))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.atLeast((int)3))).getState();
    }

    @Test
    public void releaseLockRetrieveOnError() throws InterruptedException {
        this.setCircuitState(MQCircuitBreaker.CircuitState.HALF_OPEN);
        AtomicBoolean lock = new AtomicBoolean(false);
        Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).then(i -> lock.compareAndSet(false, true));
        Mockito.when((Object)this.circuitBreakerMock.releaseCircuitLock()).then(i -> lock.compareAndSet(true, false));
        this.setDestinationCallBackOnSubscribe(o -> this.testCallbackExecutor.submit(() -> o.onError((Throwable)new RuntimeException("Mock Error"))));
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1500).atLeast(3))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.atLeast((int)3))).releaseCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).acquireCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).awaitCircuitLock();
    }

    @Test
    public void releaseLockOnEmptyResponse() throws InterruptedException {
        this.setCircuitState(MQCircuitBreaker.CircuitState.HALF_OPEN);
        AtomicBoolean lock = new AtomicBoolean(false);
        Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).then(i -> lock.compareAndSet(false, true));
        Mockito.when((Object)this.circuitBreakerMock.releaseCircuitLock()).then(i -> lock.compareAndSet(true, false));
        this.setDestinationCallBackOnSubscribe(o -> this.testCallbackExecutor.submit(() -> o.onSuccess((Object)ImmutableList.of())));
        this.prefetcher.start();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1500).atLeast(3))).receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.atLeast((int)3))).releaseCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).acquireCircuitLock();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.never())).awaitCircuitLock();
    }

    @Test
    public void messagesAreNackedIfCircuitIsOpen() throws InterruptedException {
        CountDownLatch latch = this.simulateCircuitOpenDuringReceive();
        this.prefetcher.start();
        latch.await();
        this.testCallbackExecutor.schedule(() -> (AnypointMqMessage)this.prefetcher.get().toBlocking().first(), 500L, TimeUnit.MILLISECONDS);
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)1000L).times(3))).remove(org.mockito.Matchers.anyString());
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.timeout((long)1000L))).nack((List)org.mockito.Matchers.any(List.class));
    }

    @Test
    public void singleMessageDispatchedWhenWaitingForSuccessResultOnHalfOpen() throws InterruptedException {
        Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).thenReturn((Object)true);
        ((MQCircuitBreaker)Mockito.doAnswer(i -> {
            this.setCircuitState(MQCircuitBreaker.CircuitState.CLOSED);
            return null;
        }).when((Object)this.circuitBreakerMock)).awaitCircuitLock();
        CountDownLatch latch = this.simulateHalfOpenDuringRequest();
        this.prefetcher.start();
        this.testCallbackExecutor.schedule(() -> (AnypointMqMessage)this.prefetcher.get().toBlocking().first(), 500L, TimeUnit.MILLISECONDS);
        latch.await();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.timeout((long)1000000L).atLeast(1))).acquireCircuitLock();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)1000L))).remove(org.mockito.Matchers.anyString());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.timeout((long)1000L))).awaitCircuitLock();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1000).never())).nack((AnypointMqMessage)org.mockito.Matchers.any(AnypointMqMessage.class));
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.never())).nack((List)org.mockito.Matchers.any(List.class));
    }

    @Test
    public void testMessageDispatchedWaitingForErrorResultOnHalfOpen() throws InterruptedException {
        Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).thenReturn((Object)true);
        ((MQCircuitBreaker)Mockito.doAnswer(i -> {
            this.setCircuitState(MQCircuitBreaker.CircuitState.OPEN);
            return null;
        }).when((Object)this.circuitBreakerMock)).awaitCircuitLock();
        CountDownLatch latch = this.simulateHalfOpenDuringRequest();
        this.prefetcher.start();
        this.testCallbackExecutor.schedule(() -> (AnypointMqMessage)this.prefetcher.get().toBlocking().first(), 500L, TimeUnit.MILLISECONDS);
        latch.await();
        ((MessagePreserver)Mockito.verify((Object)this.preserverMock, (VerificationMode)Mockito.timeout((long)1000L).atLeast(3))).remove(org.mockito.Matchers.anyString());
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.timeout((long)1000L))).awaitCircuitLock();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.after((int)1000))).nack((List)org.mockito.Matchers.any(List.class));
    }

    @Test
    public void skipNackWhenSingleMessageUsedForCircuitTestingFails() throws InterruptedException {
        Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).thenReturn((Object)true);
        ((MQCircuitBreaker)Mockito.doAnswer(i -> {
            this.setCircuitState(MQCircuitBreaker.CircuitState.OPEN);
            return null;
        }).when((Object)this.circuitBreakerMock)).awaitCircuitLock();
        AtomicBoolean messageDispatched = new AtomicBoolean(false);
        CountDownLatch latch = new CountDownLatch(1);
        this.setDestinationCallBackOnSubscribe(o -> {
            this.setCircuitState(MQCircuitBreaker.CircuitState.HALF_OPEN);
            this.testCallbackExecutor.submit(() -> {
                if (messageDispatched.compareAndSet(false, true)) {
                    o.onSuccess(this.createTestMessages(1));
                } else {
                    o.onSuccess((Object)ImmutableList.of());
                }
                latch.countDown();
            });
        });
        this.prefetcher.start();
        this.testCallbackExecutor.schedule(() -> (AnypointMqMessage)this.prefetcher.get().toBlocking().first(), 500L, TimeUnit.MILLISECONDS);
        latch.await();
        ((MQCircuitBreaker)Mockito.verify((Object)this.circuitBreakerMock, (VerificationMode)Mockito.timeout((long)10000L))).awaitCircuitLock();
        ((TestDestination)Mockito.verify((Object)this.destination, (VerificationMode)Mockito.never())).nack((List)org.mockito.Matchers.any(List.class));
    }

    private List<AnypointMqMessage> createTestMessages(int numberOfMessages) {
        ImmutableList.Builder list = ImmutableList.builder();
        for (int i = 0; i < numberOfMessages; ++i) {
            list.add((Object)this.createTestMessage(String.valueOf(i)));
        }
        return list.build();
    }

    private AnypointMqMessage createTestMessage(String id) {
        return new TestMessage("id-" + id, "body-" + id);
    }

    private void setCircuitState(MQCircuitBreaker.CircuitState state) {
        Mockito.when((Object)this.circuitBreakerMock.getState()).thenReturn((Object)state);
        Mockito.when((Object)this.circuitBreakerMock.isOpen()).thenReturn((Object)(state == MQCircuitBreaker.CircuitState.OPEN ? 1 : 0));
        Mockito.when((Object)this.circuitBreakerMock.isClosed()).thenReturn((Object)(state == MQCircuitBreaker.CircuitState.CLOSED ? 1 : 0));
        Mockito.when((Object)this.circuitBreakerMock.isHalfOpen()).thenReturn((Object)(state == MQCircuitBreaker.CircuitState.HALF_OPEN ? 1 : 0));
        if (state != MQCircuitBreaker.CircuitState.HALF_OPEN) {
            Mockito.when((Object)this.circuitBreakerMock.acquireCircuitLock()).thenReturn((Object)false);
            Mockito.when((Object)this.circuitBreakerMock.releaseCircuitLock()).thenReturn((Object)false);
        }
    }

    private void setDestinationCallBackOnSubscribe(final Consumer<CourierObserver<List<AnypointMqMessage>>> callback) {
        Mockito.when(this.destination.receive(org.mockito.Matchers.anyInt(), org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyLong())).thenReturn((Object)new CourierObservable<List<AnypointMqMessage>>(){

            public void subscribe(CourierObserver<List<AnypointMqMessage>> observer) {
                callback.accept(observer);
            }

            public List<AnypointMqMessage> getValue() {
                return null;
            }

            public void fireAndForget() {
            }
        });
    }

    private CountDownLatch simulateCircuitStateChangeDuringReceiveTime(MQCircuitBreaker.CircuitState newState) {
        CountDownLatch latch = new CountDownLatch(1);
        this.setDestinationCallBackOnSubscribe(o -> {
            this.setCircuitState(newState);
            this.testCallbackExecutor.submit(() -> {
                o.onSuccess(this.createTestMessages(3));
                latch.countDown();
            });
        });
        return latch;
    }

    private CountDownLatch simulateCircuitOpenDuringReceive() {
        return this.simulateCircuitStateChangeDuringReceiveTime(MQCircuitBreaker.CircuitState.OPEN);
    }

    private CountDownLatch simulateHalfOpenDuringRequest() {
        return this.simulateCircuitStateChangeDuringReceiveTime(MQCircuitBreaker.CircuitState.HALF_OPEN);
    }
}

