/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.util.MultiMap;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;
import org.mule.test.petstore.extension.WatermarkingPetAdoptionSource;

public class PollingSourceRestartingTestCase
extends AbstractExtensionFunctionalTestCase {
    private static int PROBER_TIMEOUT = 30000;
    private static int CHECK_NOT_PROBER_TIMEOUT = 5000;
    private static int PROBER_FREQUENCY = 500;
    protected static MultiMap<Integer, String> ADOPTIONS = new MultiMap();

    protected String getConfigFile() {
        return "polling-source-restarting-config.xml";
    }

    protected void doTearDown() throws Exception {
        ADOPTIONS.clear();
        WatermarkingPetAdoptionSource.resetSource();
    }

    @Test
    public void unprocessedItemsAreProcessedWhenSourceIsRestartedMidPoll() throws Exception {
        this.assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "Colonel Meow", "Daphne", "Elsa"), "unprocessedItemsAreProcessedWhenSourceIsRestartedMidPoll");
    }

    @Test
    public void processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPoll() throws Exception {
        this.assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara"), "processedItemsWithSameWatermarkAreNotReprocessedWhenSourceIsRestartedMidPoll");
    }

    @Test
    public void processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPoll() throws Exception {
        this.assertWatermarkingForStopStartScenario(Arrays.asList("Anibal", "Barbara", "ANIBAL", "BARBARA", "Colonel Meow"), "processedItemsWithNewWatermarkAreReprocessedWhenSourceIsRestartedMidPoll");
    }

    private void assertWatermarkingForStopStartScenario(List<String> expectedPets, String flowName) throws Exception {
        this.startFlow(flowName);
        WatermarkingPetAdoptionSource.beginLatch.await();
        this.stopFlow(flowName);
        PollingProber.check((long)PROBER_TIMEOUT, (long)PROBER_FREQUENCY, () -> this.getFlowConstruct(flowName).getLifecycleState().isStopped());
        this.startFlow(flowName);
        this.waitForAllPetsToBeAdopted(expectedPets);
        this.checkNoMorePetsAdopted(expectedPets);
        this.assertAdoptedPets(expectedPets);
    }

    private void waitForAllPetsToBeAdopted(List<String> pets) {
        PollingProber.check((long)PROBER_TIMEOUT, (long)PROBER_FREQUENCY, () -> ADOPTIONS.size() == pets.size());
    }

    private void checkNoMorePetsAdopted(List<String> pets) {
        PollingProber.checkNot((long)CHECK_NOT_PROBER_TIMEOUT, (long)PROBER_FREQUENCY, () -> ADOPTIONS.size() > pets.size());
    }

    private void assertAdoptedPets(List<String> pets) {
        ArrayList adoptedPets = new ArrayList();
        for (Integer key : ADOPTIONS.keySet()) {
            adoptedPets.addAll(ADOPTIONS.getAll((Object)key));
        }
        MatcherAssert.assertThat(adoptedPets, (Matcher)Matchers.contains((Object[])pets.toArray()));
    }

    private void startFlow(String flowName) throws Exception {
        ((Startable)this.getFlowConstruct(flowName)).start();
    }

    private void stopFlow(String flowName) throws Exception {
        ((Stoppable)this.getFlowConstruct(flowName)).stop();
    }

    public static class AdoptionProcessor
    implements Processor {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public CoreEvent process(CoreEvent event) throws MuleException {
            String pet = (String)event.getMessage().getPayload().getValue();
            Integer poll = (Integer)event.getMessage().getAttributes().getValue();
            MultiMap<Integer, String> multiMap = ADOPTIONS;
            synchronized (multiMap) {
                ADOPTIONS.put((Object)poll, (Object)pet);
            }
            return event;
        }
    }
}

