/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.source;

import io.qameta.allure.Description;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;
import org.mule.test.petstore.extension.NumberPetAdoptionSource;
import org.mule.test.petstore.extension.PetAdoptionSource;
import org.mule.test.petstore.extension.PetFailingPollingSource;

public class PollingSourceTestCase
extends AbstractExtensionFunctionalTestCase {
    private static final List<CoreEvent> ADOPTION_EVENTS = new LinkedList<CoreEvent>();

    protected void doTearDown() throws Exception {
        ADOPTION_EVENTS.clear();
    }

    protected String getConfigFile() {
        return "polling-source-config.xml";
    }

    @Before
    public void resetCounters() throws Exception {
        PetFailingPollingSource.STARTED_POLLS = 0;
        PetFailingPollingSource.POLL_INVOCATIONS.clear();
        PetFailingPollingSource.STARTED_SOURCES.clear();
        PetAdoptionSource.STARTED_POLLS = 0;
    }

    @Test
    public void vanillaPoll() throws Exception {
        this.startFlow("vanilla");
        this.assertAllPetsAdopted();
        PollingProber.check((long)5000L, (long)200L, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return PetAdoptionSource.COMPLETED_POLLS > 1 && PetAdoptionSource.ADOPTED_PET_COUNT >= ADOPTION_EVENTS.size();
            }
        });
    }

    @Test
    public void idempotentPoll() throws Exception {
        this.startFlow("idempotent");
        PollingProber.check((long)5000L, (long)100L, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return PetAdoptionSource.REJECTED_ADOPTIONS >= PetAdoptionSource.ALL_PETS.size() && PetAdoptionSource.ALL_PETS.containsAll(ADOPTION_EVENTS.stream().map(e -> e.getMessage().getPayload().getValue().toString()).collect(Collectors.toList()));
            }
        });
        this.assertIdempotentAdoptions();
    }

    @Test
    public void idempotentLocksAreReleased() throws Exception {
        this.startFlow("idempotentLocksAreReleased");
        this.assertAllPetsAdopted();
        this.doTearDown();
        this.assertAllPetsAdopted();
    }

    @Test
    public void watermarkPoll() throws Exception {
        this.startFlow("watermark");
        this.assertAllPetsAdopted();
        this.assertIdempotentAdoptions();
    }

    @Test
    public void failingPoll() throws Exception {
        this.startFlow("failingPoll");
        PollingProber.check((long)5000L, (long)100L, () -> PetAdoptionSource.FAILED_ADOPTION_COUNT >= PetAdoptionSource.ALL_PETS.size());
    }

    @Test
    public void multiplePhasesOfWatermarkPoll() throws Exception {
        this.startFlow("multiplePhasesOfWaterMark");
        this.assertIdempotentAdoptions();
    }

    @Test
    public void multiplePhasesOfWatermarkWithIncreasingAndDecreasingWatermarksPoll() throws Exception {
        this.startFlow("multiplePhasesOfWatermarkWithIncreasingAndDecreasingWatermarks");
        this.assertAllNumbersAdoptedExactlyOnce();
    }

    @Test
    public void whenReconnectingAfterConnectionExceptionSchedulerRunsWithoutStartDelay() throws Exception {
        this.startFlow("fixedFrequencyReconnectingPoll");
        this.assertAllPetsAdopted();
    }

    @Description(value="This test reflects a behavior that we must preserve, when a polling source is stopped and started the scheduler must be stopped and a new one must be started.")
    @Test
    public void whenSourceIsStopAndStartedSchedulerIsReset() throws Exception {
        this.startFlow("longFrequencyPoll");
        this.assertStartedPolls(1);
        this.stopFlow("longFrequencyPoll");
        Thread.sleep(1000L);
        this.startFlow("longFrequencyPoll");
        this.assertStartedPolls(1);
    }

    @Test
    public void sourceRetriggersImmediatlyOnReconnection() throws Exception {
        this.startFlow("failingLongFrequencyPoll");
        this.assertPetFailingSourcePollsFromDifferentSources(2);
    }

    private void assertStartedPolls(int polls) {
        PollingProber.check((long)5000L, (long)200L, () -> {
            MatcherAssert.assertThat((Object)PetAdoptionSource.STARTED_POLLS, (Matcher)Matchers.is((Object)polls));
            return true;
        });
    }

    private void assertPetFailingSourcePollsFromDifferentSources(int polls) {
        PollingProber.check((long)5000L, (long)200L, () -> {
            MatcherAssert.assertThat((Object)PetFailingPollingSource.STARTED_POLLS, (Matcher)Matchers.is((Object)polls));
            return true;
        });
        MatcherAssert.assertThat((Object)PetFailingPollingSource.POLL_INVOCATIONS.size(), (Matcher)Matchers.is((Object)polls));
        PetFailingPollingSource.POLL_INVOCATIONS.entrySet().forEach(entry -> MatcherAssert.assertThat(entry.getValue(), (Matcher)Matchers.is((Object)1)));
    }

    private void assertIdempotentAdoptions() {
        PollingProber.checkNot((long)5000L, (long)100L, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return ADOPTION_EVENTS.size() > PetAdoptionSource.ALL_PETS.size();
            }
        });
    }

    private void assertAllPetsAdopted() {
        PollingProber.check((long)5000L, (long)200L, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return ADOPTION_EVENTS.size() >= PetAdoptionSource.ALL_PETS.size() && ADOPTION_EVENTS.stream().map(e -> e.getMessage().getPayload().getValue().toString()).collect(Collectors.toList()).containsAll(PetAdoptionSource.ALL_PETS);
            }
        });
    }

    private void assertAllNumbersAdoptedExactlyOnce() {
        PollingProber.check((long)5000L, (long)200L, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return ADOPTION_EVENTS.size() == NumberPetAdoptionSource.ALL_NUMBERS.size() && ADOPTION_EVENTS.stream().map(e -> e.getMessage().getPayload().getValue().toString()).collect(Collectors.toList()).containsAll(NumberPetAdoptionSource.ALL_NUMBERS);
            }
        });
    }

    private void startFlow(String flowName) throws Exception {
        ((Startable)this.getFlowConstruct(flowName)).start();
    }

    private void stopFlow(String flowName) throws Exception {
        ((Stoppable)this.getFlowConstruct(flowName)).stop();
    }

    public static class AdoptionProcessor
    implements Processor {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public CoreEvent process(CoreEvent event) throws MuleException {
            List list = ADOPTION_EVENTS;
            synchronized (list) {
                ADOPTION_EVENTS.add(event);
            }
            return event;
        }
    }
}

