/*
 * Decompiled with CFR 0.152.
 */
package org.mule.test.module.extension.source;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Test;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.tck.probe.PollingProber;
import org.mule.test.module.extension.AbstractExtensionFunctionalTestCase;
import org.mule.test.petstore.extension.PetAdoptionSource;
import org.mule.test.petstore.extension.WatermarkingPetAdoptionSource;

public class PollingSourceWatermarkTestCase
extends AbstractExtensionFunctionalTestCase {
    private static int TEST_TIMEOUT = 120000;
    private static int SHORT_TIMEOUT = 5000;
    private static int LONG_TIMEOUT = 30000;
    private static int PROBER_FREQUENCY = 100;
    private static final List<CoreEvent> ADOPTION_EVENTS = new LinkedList<CoreEvent>();

    protected int getTimeoutSystemProperty() {
        return TEST_TIMEOUT;
    }

    protected void doTearDown() throws Exception {
        ADOPTION_EVENTS.clear();
        WatermarkingPetAdoptionSource.resetSource();
    }

    protected String getConfigFile() {
        return "polling-source-watermark-config.xml";
    }

    @Test
    public void watermarkPoll() throws Exception {
        this.startFlow("watermark");
        this.assertAllPetsAdopted(PetAdoptionSource.ALL_PETS);
        this.assertIdempotentAdoptions(PetAdoptionSource.ALL_PETS);
    }

    @Test
    public void repeatedItemInNewPollSetsUpdatedWatermark() throws Exception {
        List<String> expectedPets = Arrays.asList("Anibal", "ANIBAL");
        this.startFlow("repeatedItemInNewPollSetsUpdatedWatermark");
        this.assertAllPetsAdopted(expectedPets);
        this.assertIdempotentAdoptions(expectedPets);
    }

    @Test
    public void repeatedItemInNewPollDoesNotSetUpdatedWatermark() throws Exception {
        List<String> expectedPets = Arrays.asList("Anibal", "Barbara", "Colonel Meow", "BARBARA");
        this.startFlow("repeatedItemInNewPollDoesNotSetUpdatedWatermark");
        this.assertAllPetsAdopted(expectedPets);
        this.assertIdempotentAdoptions(expectedPets);
    }

    private void assertIdempotentAdoptions(List<String> pets) {
        PollingProber.checkNot((long)LONG_TIMEOUT, (long)PROBER_FREQUENCY, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return ADOPTION_EVENTS.size() > pets.size();
            }
        });
    }

    private void assertAllPetsAdopted(List<String> pets) {
        PollingProber.check((long)SHORT_TIMEOUT, (long)PROBER_FREQUENCY, () -> {
            List<CoreEvent> list = ADOPTION_EVENTS;
            synchronized (list) {
                return ADOPTION_EVENTS.size() >= pets.size() && ADOPTION_EVENTS.stream().map(e -> e.getMessage().getPayload().getValue().toString()).collect(Collectors.toList()).containsAll(pets);
            }
        });
    }

    private void startFlow(String flowName) throws Exception {
        ((Startable)this.getFlowConstruct(flowName)).start();
    }

    public static class AdoptionProcessor
    implements Processor {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public CoreEvent process(CoreEvent event) throws MuleException {
            List list = ADOPTION_EVENTS;
            synchronized (list) {
                ADOPTION_EVENTS.add(event);
            }
            return event;
        }
    }
}

