/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.spi.work.Work;
import org.assertj.core.api.ThrowableAssert;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.meta.model.EnrichableModel;
import org.mule.runtime.api.notification.ExceptionNotification;
import org.mule.runtime.api.notification.ExceptionNotificationListener;
import org.mule.runtime.api.notification.NotificationListener;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.Injector;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.retry.async.AsynchronousRetryTemplate;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.context.MuleContextWithRegistry;
import org.mule.runtime.core.internal.registry.MuleRegistry;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.mule.runtime.core.privileged.util.LoggingTestUtils;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.extension.api.runtime.exception.ExceptionHandler;
import org.mule.runtime.module.extension.internal.runtime.source.AbstractExtensionMessageSourceTestCase;
import org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource;
import org.mule.runtime.module.extension.internal.runtime.source.SourceAdapterFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCompletionHandlerFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceConnectionManager;
import org.mule.sdk.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.sdk.api.runtime.source.Source;
import org.mule.sdk.api.runtime.source.SourceCallback;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.tck.probe.Probe;
import org.mule.test.heisenberg.extension.exception.HeisenbergConnectionExceptionEnricher;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

@RunWith(value=Parameterized.class)
public class ExtensionMessageSourceTestCase
extends AbstractExtensionMessageSourceTestCase {
    protected static final int TEST_TIMEOUT = 3000;
    protected static final int TEST_POLL_DELAY = 1000;
    protected String property;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"primary node only sync", true, false}, {"primary node only async", true, true}, {"all nodes sync", false, false}, {"all nodes async", false, true});
    }

    public ExtensionMessageSourceTestCase(String name, boolean primaryNodeOnly, boolean isAsync) {
        this.primaryNodeOnly = primaryNodeOnly;
        if (isAsync) {
            this.retryPolicyTemplate = new AsynchronousRetryTemplate((RetryPolicyTemplate)new SimpleRetryPolicyTemplate(0L, 2));
        } else {
            SimpleRetryPolicyTemplate template = new SimpleRetryPolicyTemplate(0L, 2);
            template.setNotificationFirer(this.notificationDispatcher);
            this.retryPolicyTemplate = template;
        }
    }

    protected void doSetUpBeforeMuleContextCreation() {
        this.property = System.setProperty("mule.compute.connection.errors.in.stats", "true");
    }

    @After
    public void restoreProperty() {
        System.clearProperty("mule.compute.connection.errors.in.stats");
    }

    @Test
    public void handleMessage() throws Exception {
        Mockito.reset((Object[])new SourceCallbackFactory[]{this.sourceCallbackFactory});
        Mockito.when((Object)this.sourceCallbackFactory.createSourceCallback((SourceCompletionHandlerFactory)ArgumentMatchers.any())).thenReturn((Object)this.sourceCallback);
        AtomicBoolean handled = new AtomicBoolean(false);
        Latch latch = new Latch();
        ((Source)Mockito.doAnswer(invocationOnMock -> {
            this.sourceCallback.handle(this.result);
            handled.set(true);
            latch.release();
            return null;
        }).when((Object)this.source)).onStart(this.sourceCallback);
        ((Scheduler)Mockito.doAnswer(invocation -> {
            ((Work)invocation.getArguments()[0]).run();
            return null;
        }).when((Object)this.cpuLightScheduler)).execute((Runnable)ArgumentMatchers.any());
        this.start();
        latch.await();
        MatcherAssert.assertThat((Object)handled.get(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void handleExceptionAndRestart() throws Exception {
        this.start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source)).onStop();
            return true;
        }));
        ((Scheduler)Mockito.verify((Object)this.ioScheduler, (VerificationMode)Mockito.never())).stop();
        ((Scheduler)Mockito.verify((Object)this.cpuLightScheduler, (VerificationMode)Mockito.never())).stop();
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void initialise() throws Exception {
        if (!this.messageSource.getLifecycleState().isInitialised()) {
            this.messageSource.initialise();
            ((Injector)Mockito.verify((Object)muleContext.getInjector())).inject((Object)this.source);
            ((Initialisable)Mockito.verify((Object)((Initialisable)this.source))).initialise();
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.never())).onStart(this.sourceCallback);
        }
    }

    @Test
    public void sourceIsInstantiatedOnce() throws Exception {
        this.initialise();
        this.start();
        ((SourceAdapterFactory)Mockito.verify((Object)this.sourceAdapterFactory, (VerificationMode)Mockito.times((int)1))).createAdapter((Optional)ArgumentMatchers.any(), (SourceCallbackFactory)ArgumentMatchers.any(), (Component)ArgumentMatchers.any(), (SourceConnectionManager)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void failToStart() throws Exception {
        ConnectionException connectionException = new ConnectionException("ERROR");
        DefaultMuleException e = new DefaultMuleException((Throwable)connectionException);
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStart((SourceCallback)ArgumentMatchers.any());
        if (!this.retryPolicyTemplate.isAsync()) {
            this.expectedException.expect(CoreMatchers.is((Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            this.expectedException.expectCause(CoreMatchers.is((Object)connectionException));
        } else {
            ExtensionMessageSourceTestCase extensionMessageSourceTestCase = this;
            extensionMessageSourceTestCase.expectedException.none();
        }
        this.messageSource.initialise();
        this.messageSource.start();
    }

    @Test
    public void dispatchNotificationWhenFailToStart() throws Exception {
        Latch latch = new Latch();
        ArrayList notifications = new ArrayList();
        ExceptionNotificationListener listener = notification -> {
            notifications.add(notification);
            latch.release();
        };
        this.registerNotificationListener(listener);
        ConnectionException connectionException = new ConnectionException("ERROR");
        DefaultMuleException e = new DefaultMuleException((Throwable)connectionException);
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStart((SourceCallback)ArgumentMatchers.any());
        this.messageSource.initialise();
        try {
            this.messageSource.start();
            latch.await(5L, TimeUnit.SECONDS);
            MatcherAssert.assertThat(notifications.get(0), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ExceptionNotification.class)));
            MatcherAssert.assertThat((Object)((ExceptionNotification)notifications.get(0)).getException(), (Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        }
        catch (Exception ex) {
            latch.await(5L, TimeUnit.SECONDS);
            MatcherAssert.assertThat(notifications.get(0), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(ExceptionNotification.class)));
            MatcherAssert.assertThat((Object)((ExceptionNotification)notifications.get(0)).getException(), (Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        }
    }

    @Test
    public void failToStartAndStopFails() throws Exception {
        Latch latch = new Latch();
        ArrayList notifications = new ArrayList();
        ExceptionNotificationListener listener = notification -> {
            notifications.add(notification);
            latch.release();
        };
        this.registerNotificationListener(listener);
        ConnectionException connectionException = new ConnectionException("ERROR");
        DefaultMuleException e = new DefaultMuleException((Throwable)connectionException);
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStart((SourceCallback)ArgumentMatchers.any());
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new NullPointerException()}).when((Object)this.source)).onStop();
        if (!this.retryPolicyTemplate.isAsync()) {
            this.expectedException.expect(CoreMatchers.is((Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            this.expectedException.expectCause(CoreMatchers.is((Object)connectionException));
        }
        this.messageSource.initialise();
        this.messageSource.start();
        if (this.retryPolicyTemplate.isAsync()) {
            latch.await(3000L, TimeUnit.SECONDS);
            MatcherAssert.assertThat((Object)((ExceptionNotification)notifications.get(0)).getException(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            MatcherAssert.assertThat((Object)((ExceptionNotification)notifications.get(0)).getException().getCause(), (Matcher)CoreMatchers.is((Object)connectionException));
        }
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        this.messageSource.initialise();
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException((Throwable)connectionException)}).when((Object)this.source)).onStart(this.sourceCallback);
        Throwable throwable = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)this.messageSource).start());
        if (!this.retryPolicyTemplate.isAsync()) {
            MatcherAssert.assertThat((Object)throwable, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            MatcherAssert.assertThat((Object)throwable, (Matcher)CoreMatchers.is(this.exhaustedBecauseOf((Throwable)connectionException)));
        } else {
            new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertNull((Object)throwable);
                return true;
            }));
        }
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void failWithNonConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new DefaultMuleException((Throwable)new IOException("ERROR"))}).when((Object)this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        Throwable throwable = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)this.messageSource).start());
        if (!this.retryPolicyTemplate.isAsync()) {
            MatcherAssert.assertThat((Object)throwable, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            MatcherAssert.assertThat((Object)org.apache.commons.lang3.exception.ExceptionUtils.getThrowables((Throwable)throwable), (Matcher)Matchers.hasItemInArray((Matcher)CoreMatchers.instanceOf(IOException.class)));
        } else {
            new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertNull((Object)throwable);
                return true;
            }));
        }
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetsReconnected() throws Exception {
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException((Throwable)new ConnectionException("ERROR"))}).doThrow(new Throwable[]{new RuntimeException((Throwable)new ConnectionException("ERROR"))}).doNothing().when((Object)this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        this.messageSource.start();
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStop();
            return true;
        }));
    }

    @Test
    public void getBackPressureStrategy() {
        MatcherAssert.assertThat((Object)this.messageSource.getBackPressureStrategy(), (Matcher)CoreMatchers.is((Object)MessageSource.BackPressureStrategy.FAIL));
    }

    @Test
    public void failOnExceptionWithConnectionExceptionAndGetsReconnected() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)1))).onStop();
            return true;
        }));
    }

    @Test
    public void failOnExceptionWithAccessTokenExpiredExceptionInConnectionExceptionAndGetsReconnected() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        Mockito.when((Object)this.configurationInstance.getConnectionProvider()).thenReturn(Optional.of(Mockito.mock(ConnectionProvider.class)));
        this.messageSource.onException(new ConnectionException((Throwable)new AccessTokenExpiredException("ERROR")));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)1))).onStop();
            return true;
        }));
    }

    @Test
    public void startFailsWithRandomException() throws Exception {
        final RuntimeException e = new RuntimeException();
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStart(this.sourceCallback);
        if (!this.retryPolicyTemplate.isAsync()) {
            this.expectedException.expect(this.exhaustedBecauseOf((Matcher<Throwable>)new BaseMatcher<Throwable>(){
                private final Matcher<Exception> exceptionMatcher;
                {
                    this.exceptionMatcher = ThrowableCauseMatcher.hasCause((Matcher)CoreMatchers.sameInstance((Object)e));
                }

                public boolean matches(Object item) {
                    return this.exceptionMatcher.matches(item);
                }

                public void describeTo(Description description) {
                    this.exceptionMatcher.describeTo(description);
                }
            }));
        } else {
            ExtensionMessageSourceTestCase extensionMessageSourceTestCase = this;
            extensionMessageSourceTestCase.expectedException.none();
        }
        this.initialise();
        this.messageSource.start();
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStop();
            return true;
        }));
    }

    @Test
    public void start() throws Exception {
        this.initialise();
        ArrayList infoMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(infoMessages, (Level)Level.INFO));
        if (!this.messageSource.getLifecycleState().isStarted()) {
            this.messageSource.start();
        }
        Injector injector = muleContext.getInjector();
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            InOrder inOrder = Mockito.inOrder((Object[])new Object[]{injector, this.source});
            ((Injector)inOrder.verify((Object)injector)).inject((Object)this.source);
            ((Initialisable)inOrder.verify((Object)((Initialisable)this.source))).initialise();
            ((Source)inOrder.verify((Object)this.source)).onStart(this.sourceCallback);
            return true;
        }));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            LoggingTestUtils.verifyLogMessage((List)infoMessages, (String)"Message source 'source' on flow 'appleFlow' successfully started", (Object[])new Object[0]);
            return true;
        }));
        PollingProber.checkNot((long)1000L, (long)3000L, () -> {
            LoggingTestUtils.verifyLogMessage((List)infoMessages, (String)"Message source 'source' on flow 'appleFlow' successfully reconnected", (Object[])new Object[0]);
            return true;
        });
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    @Test
    public void failedToCreateRetryScheduler() throws Exception {
        this.messageSource.initialise();
        RuntimeException e = new RuntimeException();
        SchedulerService schedulerService = muleContext.getSchedulerService();
        ((SchedulerService)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)schedulerService)).ioScheduler();
        Throwable throwable = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)this.messageSource).start());
        MatcherAssert.assertThat((Object)throwable.getCause(), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.sameInstance((Object)e)));
    }

    @Test
    public void stop() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        ((Source)Mockito.verify((Object)this.source)).onStop();
    }

    @Test
    public void dispose() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        this.messageSource.dispose();
        ((Disposable)Mockito.verify((Object)((Disposable)this.source))).dispose();
    }

    @Test
    public void enrichExceptionWithSourceExceptionEnricher() throws Exception {
        Mockito.when((Object)this.enricherFactory.createHandler()).thenReturn((Object)new HeisenbergConnectionExceptionEnricher());
        ExtensionsTestUtils.mockExceptionEnricher((EnrichableModel)this.sourceModel, this.enricherFactory);
        ExtensionsTestUtils.mockExceptionEnricher((EnrichableModel)this.sourceModel, this.enricherFactory);
        ExtensionMessageSource messageSource = this.getNewExtensionMessageSourceInstance();
        messageSource.initialise();
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("ERROR")}).when((Object)this.source)).onStart(this.sourceCallback);
        Throwable t = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)messageSource).start());
        if (!this.retryPolicyTemplate.isAsync()) {
            MatcherAssert.assertThat((Object)ExceptionUtils.containsType((Throwable)t, ConnectionException.class), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)t.getMessage(), (Matcher)StringContains.containsString((String)"Enriched Connection Exception: ERROR"));
        } else {
            new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertNull((Object)t);
                return true;
            }));
        }
        messageSource.stop();
    }

    @Test
    public void enrichExceptionWithExtensionEnricher() throws Exception {
        String enrichedErrorMessage = "Enriched: ERROR";
        ExceptionHandler exceptionEnricher = (ExceptionHandler)Mockito.mock(ExceptionHandler.class);
        Mockito.when((Object)exceptionEnricher.enrichException((Exception)ArgumentMatchers.any(Exception.class))).thenReturn((Object)new Exception("Enriched: ERROR"));
        Mockito.when((Object)this.enricherFactory.createHandler()).thenReturn((Object)exceptionEnricher);
        ExtensionsTestUtils.mockExceptionEnricher((EnrichableModel)this.extensionModel, this.enricherFactory);
        ExtensionMessageSource messageSource = this.getNewExtensionMessageSourceInstance();
        messageSource.initialise();
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("ERROR")}).when((Object)this.source)).onStart(this.sourceCallback);
        Throwable t = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)messageSource).start());
        if (!this.retryPolicyTemplate.isAsync()) {
            MatcherAssert.assertThat((Object)t.getMessage(), (Matcher)StringContains.containsString((String)"Enriched: ERROR"));
        } else {
            new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
                Assert.assertNull((Object)t);
                return true;
            }));
        }
        messageSource.stop();
    }

    @Test
    public void workManagerDisposedIfSourceFailsToStop() throws Exception {
        this.start();
        final RuntimeException e = new RuntimeException();
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStop();
        this.expectedException.expect((Matcher)new BaseMatcher<Throwable>(){

            public boolean matches(Object item) {
                Exception exception = (Exception)item;
                return exception.getCause() instanceof MuleException && exception.getCause().getCause() == e;
            }

            public void describeTo(Description description) {
                description.appendText("Exception was not wrapped as expected");
            }
        });
    }

    @Test
    public void actualSourceStoppedIfMessageSourceFailsToStop() throws Exception {
        Mockito.when((Object)this.configurationProvider.isDynamic()).thenReturn((Object)true);
        this.start();
        RuntimeException e = new RuntimeException();
        ((ConfigurationProvider)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.configurationProvider)).get((Event)ArgumentMatchers.any(CoreEvent.class));
        this.expectedException.expectCause(CoreMatchers.sameInstance((Object)e));
        try {
            this.messageSource.stop();
        }
        finally {
            ((Source)Mockito.verify((Object)this.source)).onStop();
        }
    }

    @Test
    public void reconnectTwice() throws Exception {
        this.start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> !this.messageSource.isReconnecting()));
        ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStart(this.sourceCallback);
        ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)1))).onStop();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> !this.messageSource.isReconnecting()));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)3))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)2))).onStop();
            return true;
        }));
    }

    @Test
    public void failToReconnect() throws Exception {
        this.start();
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{connectionException}).when((Object)this.source)).onStart((SourceCallback)ArgumentMatchers.any());
        this.messageSource.onException(connectionException);
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> !this.messageSource.isReconnecting()));
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)4))).onStart(this.sourceCallback);
            ((Source)Mockito.verify((Object)this.source, (VerificationMode)Mockito.times((int)4))).onStop();
            return true;
        }));
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(Throwable cause) {
        return this.exhaustedBecauseOf((Matcher<Throwable>)CoreMatchers.sameInstance((Object)cause));
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(final Matcher<Throwable> causeMatcher) {
        return new BaseMatcher<Throwable>(){

            public boolean matches(Object item) {
                Throwable exception = (Throwable)item;
                return causeMatcher.matches((Object)exception.getCause());
            }

            public void describeTo(Description description) {
                causeMatcher.describeTo(description);
            }
        };
    }

    @Test
    public void getMetadataKeyIdObjectValue() throws Exception {
        String person = "person";
        this.source = new DummySource("person");
        this.sourceAdapter = this.createSourceAdapter();
        Mockito.when((Object)this.sourceAdapterFactory.createAdapter((Optional)ArgumentMatchers.any(), (SourceCallbackFactory)ArgumentMatchers.any(), (Component)ArgumentMatchers.any(), (SourceConnectionManager)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)this.sourceAdapter);
        this.messageSource = this.getNewExtensionMessageSourceInstance();
        this.messageSource.initialise();
        this.messageSource.start();
        Object metadataKeyValue = this.messageSource.getParameterValueResolver().getParameterValue("metadataKey");
        MatcherAssert.assertThat((Object)metadataKeyValue, (Matcher)CoreMatchers.is((Object)"person"));
    }

    @Test
    public void getRetryPolicyExhaustedAndConnectionErrorsAreComputed() throws Exception {
        muleContext.getStatistics().setEnabled(true);
        this.messageSource.initialise();
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException((Throwable)connectionException)}).when((Object)this.source)).onStart(this.sourceCallback);
        Throwable throwable = ThrowableAssert.catchThrowable(() -> ((ExtensionMessageSource)this.messageSource).start());
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            MatcherAssert.assertThat((Object)muleContext.getStatistics().getApplicationStatistics().getConnectionErrors(), (Matcher)CoreMatchers.equalTo((Object)2L));
            MatcherAssert.assertThat((Object)muleContext.getStatistics().getApplicationStatistics().getExecutionErrors(), (Matcher)CoreMatchers.equalTo((Object)2L));
            return true;
        }));
    }

    @Test
    public void sourceInitializedLogMessage() throws Exception {
        ArrayList debugMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(debugMessages, (Level)Level.DEBUG));
        this.messageSource.initialise();
        if (this.primaryNodeOnly) {
            LoggingTestUtils.verifyLogMessage(debugMessages, (String)"Message source 'source' on flow 'appleFlow' running on the primary node is initializing. Note that this Message source must run on the primary node only.", (Object[])new Object[0]);
        } else {
            LoggingTestUtils.verifyLogMessage(debugMessages, (String)"Message source 'source' on flow 'appleFlow' is initializing. This is the primary node of the cluster.", (Object[])new Object[0]);
        }
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    @Test
    public void sourceStartedLogMessage() throws Exception {
        ArrayList debugMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(debugMessages, (Level)Level.DEBUG));
        this.messageSource.initialise();
        this.messageSource.start();
        LoggingTestUtils.verifyLogMessage(debugMessages, (String)"Message source 'source' on flow 'appleFlow' is starting", (Object[])new Object[0]);
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    @Test
    public void sourceStoppedLogMessage() throws Exception {
        ArrayList debugMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(debugMessages, (Level)Level.DEBUG));
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        LoggingTestUtils.verifyLogMessage(debugMessages, (String)"Message source 'source' on flow 'appleFlow' is stopping", (Object[])new Object[0]);
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    @Test
    public void getRetryPolicyExhaustedAndLogShutdownMessage() throws Exception {
        ArrayList errorMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(errorMessages, (Level)Level.ERROR));
        this.start();
        ConnectionException e = new ConnectionException("ERROR");
        ((Source)Mockito.doThrow((Throwable[])new Throwable[]{e}).when((Object)this.source)).onStart((SourceCallback)ArgumentMatchers.any());
        this.messageSource.onException(e);
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            LoggingTestUtils.verifyLogRegex((List)errorMessages, (String)"Message source 'source' on flow 'appleFlow' could not be reconnected. Will be shutdown. (.*)", (Object[])new Object[0]);
            return true;
        }));
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    @Test
    public void reconnectAndLogSuccessMessage() throws Exception {
        this.start();
        ArrayList infoMessages = new ArrayList();
        Logger oldLogger = LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)LoggingTestUtils.createMockLogger(infoMessages, (Level)Level.INFO));
        ConnectionException e = new ConnectionException("ERROR");
        this.messageSource.onException(e);
        new PollingProber(3000L, 1000L).check((Probe)new JUnitLambdaProbe(() -> {
            LoggingTestUtils.verifyLogMessage((List)infoMessages, (String)"Message source 'source' on flow 'appleFlow' successfully reconnected", (Object[])new Object[0]);
            return true;
        }));
        LoggingTestUtils.setLogger((Object)this.messageSource, (String)"LOGGER", (Logger)oldLogger);
    }

    private void registerNotificationListener(ExceptionNotificationListener exceptionNotificationListener) throws RegistrationException {
        MuleRegistry muleRegistry = ((MuleContextWithRegistry)muleContext).getRegistry();
        NotificationListenerRegistry notificationListenerRegistry = (NotificationListenerRegistry)muleRegistry.lookupObject(NotificationListenerRegistry.class);
        notificationListenerRegistry.registerListener((NotificationListener)exceptionNotificationListener);
    }

    private class DummySource
    extends Source {
        private final String metadataKey;

        DummySource(String metadataKey) {
            this.metadataKey = metadataKey;
        }

        public void onStart(SourceCallback sourceCallback) throws MuleException {
        }

        public void onStop() {
        }
    }
}

