/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source;

import com.google.common.collect.ImmutableMap;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.cluster.ClusterService;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.config.ArtifactEncoding;
import org.mule.runtime.api.config.Feature;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Lifecycle;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.meta.model.ComponentModel;
import org.mule.runtime.api.meta.model.ExtensionModel;
import org.mule.runtime.api.meta.model.config.ConfigurationModel;
import org.mule.runtime.api.meta.model.parameter.ParameterizedModel;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.profiling.ProfilingService;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionType;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.api.util.NameUtils;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.connector.ConnectionManager;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.SystemExceptionHandler;
import org.mule.runtime.core.api.extension.ExtensionManager;
import org.mule.runtime.core.api.lifecycle.LifecycleState;
import org.mule.runtime.core.api.lifecycle.LifecycleStateEnabled;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.runtime.core.api.management.stats.AllStatistics;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.retry.ReconnectionConfig;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.streaming.CursorProviderFactory;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.el.TemplateParser;
import org.mule.runtime.core.internal.exception.MessagingExceptionResolver;
import org.mule.runtime.core.internal.execution.ExceptionCallback;
import org.mule.runtime.core.internal.execution.MessageProcessContext;
import org.mule.runtime.core.internal.execution.MessageProcessingManager;
import org.mule.runtime.core.internal.lifecycle.DefaultLifecycleManager;
import org.mule.runtime.core.internal.transaction.MuleTransactionConfig;
import org.mule.runtime.core.internal.transaction.TransactionFactoryLocator;
import org.mule.runtime.core.privileged.PrivilegedMuleContext;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.core.privileged.transaction.TransactionConfig;
import org.mule.runtime.core.privileged.transaction.TransactionFactory;
import org.mule.runtime.extension.api.runtime.config.ConfigurationInstance;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.extension.api.runtime.config.ConfigurationStats;
import org.mule.runtime.extension.api.runtime.config.ConfiguredComponent;
import org.mule.runtime.extension.api.runtime.source.ParameterizedSource;
import org.mule.runtime.module.extension.api.runtime.resolver.ParameterValueResolver;
import org.mule.runtime.module.extension.api.runtime.resolver.ResolverSet;
import org.mule.runtime.module.extension.api.runtime.resolver.ValueResolvingContext;
import org.mule.runtime.module.extension.internal.runtime.ExtensionComponent;
import org.mule.runtime.module.extension.internal.runtime.config.MutableConfigurationStats;
import org.mule.runtime.module.extension.internal.runtime.connectivity.oauth.ExtensionsOAuthUtils;
import org.mule.runtime.module.extension.internal.runtime.exception.ExceptionHandlerManager;
import org.mule.runtime.module.extension.internal.runtime.operation.IllegalSourceException;
import org.mule.runtime.module.extension.internal.runtime.resolver.ObjectBasedParameterValueResolver;
import org.mule.runtime.module.extension.internal.runtime.source.DefaultSourceCallback;
import org.mule.runtime.module.extension.internal.runtime.source.SourceAdapter;
import org.mule.runtime.module.extension.internal.runtime.source.SourceAdapterFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceConnectionManager;
import org.mule.runtime.module.extension.internal.runtime.source.poll.RestartContext;
import org.mule.runtime.module.extension.internal.util.MuleExtensionUtils;
import org.mule.runtime.module.extension.internal.util.ReconnectionUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ExtensionMessageSource
extends ExtensionComponent<SourceModel>
implements MessageSource,
ExceptionCallback<ConnectionException>,
ParameterizedSource,
ConfiguredComponent,
LifecycleStateEnabled {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionMessageSource.class);
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;
    @Inject
    private TransactionFactoryLocator transactionFactoryLocator;
    @Inject
    private ClusterService clusterService;
    @Inject
    private FeatureFlaggingService featureFlaggingService;
    @Inject
    private ProfilingService profilingService;
    @Inject
    private ArtifactEncoding artifactEncoding;
    private MessageProcessingManager messageProcessingManager;
    private final SourceModel sourceModel;
    private final SourceAdapterFactory sourceAdapterFactory;
    private final boolean primaryNodeOnly;
    private final RetryPolicyTemplate customRetryPolicyTemplate;
    private final MessageSource.BackPressureStrategy backPressureStrategy;
    private final ExceptionHandlerManager exceptionEnricherManager;
    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
    private final DefaultLifecycleManager<ExtensionMessageSource> lifecycleManager;
    private final TemplateParser expressionParser = TemplateParser.createMuleStyleParser();
    private final ConfigurationProvider explicitConfigProvider;
    private SourceConnectionManager sourceConnectionManager;
    private Processor messageProcessor;
    private final LazyValue<TransactionConfig> transactionConfig = new LazyValue(this::buildTransactionConfig);
    private SourceAdapter sourceAdapter;
    private RetryPolicyTemplate retryPolicyTemplate;
    private Scheduler retryScheduler;
    private LazyValue<FlowConstruct> flowConstruct;
    private MessageProcessContext messageProcessContext;
    private final NotificationDispatcher notificationDispatcher;
    private final String applicationName;
    private final AtomicBoolean started = new AtomicBoolean(false);

    public ExtensionMessageSource(ExtensionModel extensionModel, SourceModel sourceModel, SourceAdapterFactory sourceAdapterFactory, ConfigurationProvider configurationProvider, boolean primaryNodeOnly, RetryPolicyTemplate retryPolicyTemplate, CursorProviderFactory cursorProviderFactory, MessageSource.BackPressureStrategy backPressureStrategy, ExtensionManager managerAdapter, NotificationDispatcher notificationDispatcher, String applicationName) {
        super(extensionModel, sourceModel, configurationProvider, cursorProviderFactory, managerAdapter);
        this.sourceModel = sourceModel;
        this.sourceAdapterFactory = sourceAdapterFactory;
        this.customRetryPolicyTemplate = retryPolicyTemplate;
        this.primaryNodeOnly = primaryNodeOnly;
        this.backPressureStrategy = backPressureStrategy;
        this.notificationDispatcher = notificationDispatcher;
        this.applicationName = applicationName;
        this.exceptionEnricherManager = new ExceptionHandlerManager(extensionModel, (ComponentModel)sourceModel);
        this.lifecycleManager = new DefaultLifecycleManager(sourceModel.getName(), (Lifecycle)this);
        this.explicitConfigProvider = configurationProvider;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void createSource(boolean restarting) throws Exception {
        if (this.sourceAdapter == null) {
            CoreEvent initialiserEvent = null;
            try {
                initialiserEvent = org.mule.runtime.module.extension.api.util.MuleExtensionUtils.getInitialiserEvent(this.muleContext);
                Optional<ConfigurationInstance> configurationInstance = this.startUsingConfiguration(initialiserEvent);
                this.sourceAdapter = this.sourceAdapterFactory.createAdapter(configurationInstance, this.createSourceCallbackFactory(), (Component)this, this.sourceConnectionManager, restarting);
                this.muleContext.getInjector().inject((Object)this.sourceAdapter);
                this.retryPolicyTemplate = this.createRetryPolicyTemplate(this.customRetryPolicyTemplate);
                LifecycleUtils.initialiseIfNeeded((Object)this.retryPolicyTemplate, (boolean)true, (MuleContext)this.muleContext);
            }
            finally {
                if (initialiserEvent != null) {
                    ((BaseEventContext)initialiserEvent.getContext()).success();
                }
            }
        }
    }

    private void startSource(boolean restarting, RestartContext restartContext) throws MuleException {
        Consumer<Throwable> onFailure;
        Runnable onSuccess;
        SystemExceptionHandler systemExceptionHandler = this.muleContext.getExceptionListener();
        if (this.retryPolicyTemplate.isAsync()) {
            onSuccess = this.getSuccessRunnable(restarting);
            onFailure = t -> {
                RetryPolicyExhaustedException exception = t instanceof RetryPolicyExhaustedException ? (RetryPolicyExhaustedException)t : new RetryPolicyExhaustedException(t, (Object)this);
                systemExceptionHandler.handleException((Exception)exception);
                this.onReconnectionFailed((Throwable)exception);
            };
        } else {
            onSuccess = () -> {};
            onFailure = t -> {};
        }
        Supplier<CompletableFuture> futureSupplier = () -> {
            CompletableFuture future = new CompletableFuture();
            this.retryScheduler.execute(() -> {
                if (this.retryPolicyTemplate.isAsync()) {
                    DefaultLifecycleManager<ExtensionMessageSource> defaultLifecycleManager = this.lifecycleManager;
                    synchronized (defaultLifecycleManager) {
                        AtomicBoolean atomicBoolean = this.started;
                        synchronized (atomicBoolean) {
                            if (!this.started.get()) {
                                future.complete(null);
                                return;
                            }
                            this.doWork(restarting, restartContext, future);
                        }
                    }
                }
                this.doWork(restarting, restartContext, future);
            });
            return future;
        };
        CompletionStage future = this.retryPolicyTemplate.applyPolicy(futureSupplier, t -> true, t -> this.computeStats(), ReconnectionUtils.NULL_THROWABLE_CONSUMER, Function.identity(), this.retryScheduler).whenComplete((v, e) -> {
            if (e != null) {
                onFailure.accept((Throwable)e);
            } else {
                onSuccess.run();
            }
        });
        try {
            if (!this.retryPolicyTemplate.isAsync()) {
                ((CompletableFuture)future).get();
                this.getSuccessRunnable(restarting).run();
            }
        }
        catch (ExecutionException exception) {
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(exception.getCause(), (Object)this);
            systemExceptionHandler.handleException((Exception)retryPolicyExhaustedException);
            throw retryPolicyExhaustedException;
        }
        catch (InterruptedException e2) {
            MuleRuntimeException muleRuntimeException = new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Found exception starting source '%s' on flow '%s'", this.sourceModel.getName(), this.getLocation().getRootContainerName())), (Throwable)e2);
            systemExceptionHandler.handleException((Exception)((Object)muleRuntimeException));
            throw muleRuntimeException;
        }
    }

    private void startSource() throws MuleException {
        this.startSource(false, null);
    }

    private RetryPolicyTemplate createRetryPolicyTemplate(RetryPolicyTemplate customTemplate) {
        return this.getConfigurationInstance().map(config -> config.getConnectionProvider().orElse(null)).map(provider -> this.connectionManager.getReconnectionConfigFor(provider).getRetryPolicyTemplate(customTemplate)).orElseGet(() -> customTemplate != null ? customTemplate : ReconnectionConfig.defaultReconnectionConfig().getRetryPolicyTemplate());
    }

    private void stopSource() throws MuleException {
        this.stopSource(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RestartContext stopSource(boolean restarting) throws MuleException {
        if (this.sourceAdapter != null) {
            String sourceName = this.sourceAdapter.getName();
            CoreEvent initialiserEvent = null;
            try {
                RestartContext restartContext;
                initialiserEvent = org.mule.runtime.module.extension.api.util.MuleExtensionUtils.getInitialiserEvent(this.muleContext);
                try {
                    this.stopUsingConfiguration(initialiserEvent);
                    restartContext = restarting ? this.sourceAdapter.beginRestart() : null;
                }
                catch (Throwable throwable) {
                    try {
                        this.sourceAdapter.stop();
                        if (this.usesDynamicConfiguration()) {
                            this.disposeSource();
                        }
                        throw throwable;
                    }
                    catch (Exception e) {
                        throw new DefaultMuleException(String.format("Found exception stopping source '%s' of root component '%s'", sourceName, this.getLocation().getRootContainerName()), (Throwable)e);
                    }
                }
                this.sourceAdapter.stop();
                if (this.usesDynamicConfiguration()) {
                    this.disposeSource();
                }
                return restartContext;
            }
            finally {
                if (initialiserEvent != null) {
                    ((BaseEventContext)initialiserEvent.getContext()).success();
                }
            }
        }
        return null;
    }

    public Optional<ConfigurationInstance> getConfigurationInstance() {
        return this.sourceAdapter.getConfigurationInstance();
    }

    private SourceCallbackFactory createSourceCallbackFactory() {
        return completionHandlerFactory -> DefaultSourceCallback.builder(this.profilingService).setExceptionCallback(this).setSourceModel(this.sourceModel).setConfigurationInstance(this.getConfigurationInstance().orElse(null)).setTransactionConfig((TransactionConfig)this.transactionConfig.get()).setSource(this).setDefaultEncoding(this.artifactEncoding.getDefaultEncoding()).setListener(this.messageProcessor).setProcessingManager(this.messageProcessingManager).setProcessContext(this.messageProcessContext).setApplicationName(this.applicationName).setNotificationDispatcher(this.notificationDispatcher).setCursorStreamProviderFactory(this.getCursorProviderFactory()).setCompletionHandlerFactory(completionHandlerFactory).setErrorAfterTimeout(this.featureFlaggingService.isEnabled((Feature)MuleRuntimeFeature.ERROR_AND_ROLLBACK_TX_WHEN_TIMEOUT)).build();
    }

    public void onException(ConnectionException exception) {
        if (!this.reconnecting.compareAndSet(false, true)) {
            LOGGER.error(String.format("Message source '%s' on flow '%s' found connection error but reconnection is already in progress. Error was: %s", this.sourceModel.getName(), this.getLocation().getRootContainerName(), exception.getMessage()), (Throwable)exception);
            return;
        }
        this.muleContext.getExceptionListener().handleException((Exception)((Object)exception), this.getLocation());
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn(String.format("Message source '%s' on flow '%s' threw exception. Attempting to reconnect...", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()), (Throwable)exception);
        }
        try {
            ExtensionsOAuthUtils.refreshTokenIfNecessary(this.getConfigurationInstance().flatMap(ConfigurationInstance::getConnectionProvider).orElse(null), (Throwable)exception);
        }
        catch (Exception refreshException2) {
            DefaultMuleException refreshException2;
            if (!(refreshException2 instanceof MuleException)) {
                refreshException2 = new DefaultMuleException((Throwable)refreshException2);
            }
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(String.format("Message source '%s' on flow '%s' threw exception while trying to refresh OAuth access token: %s", this.sourceAdapter.getName(), this.getLocation().getRootContainerName(), exception.getMessage()), (Throwable)exception);
            }
            this.muleContext.getExceptionListener().handleException((Exception)refreshException2, this.getLocation());
        }
        Optional<Publisher<Void>> action = this.sourceAdapter.getReconnectionAction(exception);
        if (!action.isPresent()) {
            this.invalidateConnection(exception);
            this.retryScheduler.execute(() -> {
                try {
                    this.restart();
                }
                catch (MuleException e) {
                    this.onReconnectionFailed(e);
                }
            });
        } else {
            this.retryScheduler.execute(() -> {
                Mono reconnectionAction = action.map(p -> Mono.from((Publisher)this.retryPolicyTemplate.applyPolicy(p, this.retryScheduler))).get();
                reconnectionAction.doOnSuccess(v -> this.onReconnectionSuccessful()).doOnError(this::onReconnectionFailed).subscribe();
            });
        }
    }

    private Runnable getSuccessRunnable(boolean restarting) {
        return restarting ? this::onReconnectionSuccessful : this::onStartSuccessful;
    }

    private void onReconnectionSuccessful() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Message source '{}' on flow '{}' successfully reconnected", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
        }
        this.reconnecting.set(false);
    }

    private void onStartSuccessful() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Message source '{}' on flow '{}' successfully started", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
        }
        this.reconnecting.set(false);
    }

    private void onReconnectionFailed(Throwable exception) {
        LOGGER.error(String.format("Message source '%s' on flow '%s' could not be reconnected. Will be shutdown. %s", this.sourceModel.getName(), this.getLocation().getRootContainerName(), exception.getMessage()), exception);
        this.shutdown();
        this.reconnecting.set(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restart() throws MuleException {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (this.started.get()) {
                RestartContext restartContext = this.stopSource(true);
                this.disposeSource();
                this.startSource(true, restartContext);
            } else {
                LOGGER.warn(String.format("Message source '%s' on flow '%s' is stopped. Not doing restart", this.getLocation().getRootContainerName(), this.getLocation().getRootContainerName()));
            }
        }
    }

    @Override
    public void doStart() throws MuleException {
        if (this.shouldRunOnThisNode()) {
            this.reallyDoStart();
        }
    }

    private void reallyDoStart() throws MuleException {
        LOGGER.debug("Message source '{}' on flow '{}' is starting", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
        this.lifecycle(() -> this.lifecycleManager.fireStartPhase((phase, o) -> {
            LifecycleUtils.startIfNeeded((Object)this.retryPolicyTemplate);
            if (this.retryScheduler == null) {
                this.retryScheduler = this.schedulerService.ioScheduler();
            }
            AtomicBoolean atomicBoolean = this.started;
            synchronized (atomicBoolean) {
                this.startSource();
                this.started.set(true);
            }
        }));
    }

    @Override
    public void doStop() throws MuleException {
        LOGGER.debug("Message source '{}' on flow '{}' is stopping", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
        this.safeLifecycle(() -> this.lifecycleManager.fireStopPhase((phase, o) -> {
            AtomicBoolean atomicBoolean = this.started;
            synchronized (atomicBoolean) {
                if (this.started.compareAndSet(true, false)) {
                    this.stopSource();
                }
            }
            this.stopSchedulers();
        }));
    }

    @Override
    public void doDispose() {
        try {
            this.safeLifecycle(() -> this.lifecycleManager.fireDisposePhase((phase, o) -> {
                this.disposeSource();
                LifecycleUtils.stopIfNeeded((Object)this.retryPolicyTemplate);
                LifecycleUtils.disposeIfNeeded((Object)this.retryPolicyTemplate, (Logger)LOGGER);
                this.stopSchedulers();
            }));
        }
        catch (MuleException e) {
            LOGGER.warn(String.format("Failed to dispose message source at root element '%s'. %s", this.getLocation().getRootContainerName(), e.getMessage()), (Throwable)e);
        }
    }

    private void lifecycle(CheckedRunnable runnable) throws MuleException {
        try {
            runnable.run();
        }
        catch (Throwable e) {
            this.handleLifecycleException(e, false);
        }
    }

    private void safeLifecycle(CheckedRunnable runnable) throws MuleException {
        try {
            runnable.run();
        }
        catch (Throwable e) {
            this.handleLifecycleException(e, true);
        }
    }

    private void handleLifecycleException(Throwable e, boolean unwrapLifecycleException) throws MuleException {
        e = Exceptions.unwrap((Throwable)e);
        if (unwrapLifecycleException && e instanceof LifecycleException && e.getCause() != null) {
            e = e.getCause();
        }
        if (e instanceof IllegalStateException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Skipping lifecycle phase: " + e.getMessage(), e);
            }
        } else {
            if (e instanceof MuleException) {
                MuleException muleException = (MuleException)e;
                throw muleException;
            }
            throw new DefaultMuleException(e);
        }
    }

    private void shutdown() {
        try {
            LifecycleUtils.stopIfNeeded((Object)((Object)this));
        }
        catch (Exception e) {
            LOGGER.error(String.format("Failed to stop source '%s' on flow '%s'", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()), (Throwable)e);
        }
        LifecycleUtils.disposeIfNeeded((Object)((Object)this), (Logger)LOGGER);
    }

    private void stopSchedulers() {
        if (this.retryScheduler != null) {
            try {
                this.retryScheduler.stop();
            }
            finally {
                this.retryScheduler = null;
            }
        }
    }

    private void disposeSource() {
        LifecycleUtils.disposeIfNeeded((Object)this.sourceAdapter, (Logger)LOGGER);
        this.sourceAdapter = null;
    }

    private TransactionConfig buildTransactionConfig() {
        MuleTransactionConfig txConfig = new MuleTransactionConfig();
        txConfig.setAction(MuleExtensionUtils.toActionCode(this.sourceAdapter.getTransactionalAction()));
        txConfig.setMuleContext(this.muleContext);
        TransactionType transactionalType = this.sourceAdapter.getTransactionalType();
        txConfig.setFactory((TransactionFactory)this.transactionFactoryLocator.lookUpTransactionFactory(transactionalType).orElseThrow(() -> new IllegalStateException(String.format("Unable to create Source with Transactions of Type: [%s]. No factory available for this transaction type", transactionalType))));
        return txConfig;
    }

    SourceConnectionManager getSourceConnectionManager() {
        return this.sourceConnectionManager;
    }

    private MessageProcessContext createProcessingContext() {
        return new MessageProcessContext(){
            private final MessagingExceptionResolver messagingExceptionResolver = new MessagingExceptionResolver((Component)this.getMessageSource());

            public MessageSource getMessageSource() {
                return ExtensionMessageSource.this;
            }

            public Optional<TransactionConfig> getTransactionConfig() {
                return ExtensionMessageSource.this.sourceModel.isTransactional() ? Optional.of((TransactionConfig)ExtensionMessageSource.this.transactionConfig.get()) : Optional.empty();
            }

            public ClassLoader getExecutionClassLoader() {
                return ExtensionMessageSource.this.muleContext.getExecutionClassLoader();
            }

            public ErrorTypeLocator getErrorTypeLocator() {
                return ((PrivilegedMuleContext)ExtensionMessageSource.this.muleContext).getErrorTypeLocator();
            }

            public MessagingExceptionResolver getMessagingExceptionResolver() {
                return this.messagingExceptionResolver;
            }

            public FlowConstruct getFlowConstruct() {
                return (FlowConstruct)ExtensionMessageSource.this.flowConstruct.get();
            }
        };
    }

    private void doWork(boolean restarting, RestartContext restartContext, CompletableFuture<Void> future) {
        try {
            this.createSource(restarting);
            LifecycleUtils.initialiseIfNeeded((Object)this.sourceAdapter);
            if (restarting) {
                this.sourceAdapter.finishRestart(restartContext);
            }
            this.sourceAdapter.start();
            future.complete(null);
        }
        catch (Exception e) {
            try {
                ExceptionUtils.extractConnectionException((Throwable)e).ifPresent(this::invalidateConnection);
                this.stopSource();
            }
            catch (Exception eStop) {
                e.addSuppressed(eStop);
            }
            try {
                this.disposeSource();
            }
            catch (Exception eDispose) {
                e.addSuppressed(eDispose);
            }
            Throwable throwable = this.exceptionEnricherManager.process(e);
            Optional connectionException = ExceptionUtils.extractConnectionException((Throwable)throwable);
            if (connectionException.isPresent()) {
                throwable = (Throwable)connectionException.get();
            }
            throwable = throwable instanceof Exception ? (Exception)throwable : new MuleRuntimeException(throwable);
            future.completeExceptionally(throwable);
        }
    }

    private void computeStats() {
        AllStatistics statistics = this.muleContext.getStatistics();
        if (statistics != null && statistics.isEnabled() && this.computeConnectionErrorsInStats()) {
            statistics.getApplicationStatistics().incConnectionErrors();
        }
    }

    private boolean computeConnectionErrorsInStats() {
        return this.featureFlaggingService.isEnabled((Feature)MuleRuntimeFeature.COMPUTE_CONNECTION_ERRORS_IN_STATS);
    }

    public void setListener(Processor listener) {
        this.messageProcessor = listener;
    }

    @Inject
    public void setMessageProcessingManager(MessageProcessingManager messageProcessingManager) {
        this.messageProcessingManager = messageProcessingManager;
    }

    @Override
    protected void validateOperationConfiguration(ConfigurationProvider configurationProvider) {
        ConfigurationModel configurationModel = configurationProvider.getConfigurationModel();
        if (!configurationModel.getSourceModel(this.sourceModel.getName()).isPresent() && !configurationProvider.getExtensionModel().getSourceModel(this.sourceModel.getName()).isPresent()) {
            throw new IllegalSourceException(String.format("Root component '%s' defines an usage of operation '%s' which points to configuration '%s'. The selected config does not support that operation.", this.getLocation().getRootContainerName(), this.sourceModel.getName(), configurationProvider.getName()));
        }
    }

    @Override
    protected ParameterValueResolver getParameterValueResolver() {
        return new ObjectBasedParameterValueResolver(this.sourceAdapter.getDelegate(), (ParameterizedModel)this.sourceModel, this.reflectionCache);
    }

    @Override
    protected void doInitialise() throws InitialisationException {
        this.validateConfigurationProviderIsNotExpression();
        this.flowConstruct = new LazyValue(() -> this.componentLocator.find(this.getRootContainerLocation()).orElse(null));
        this.messageProcessContext = this.createProcessingContext();
        if (this.shouldRunOnThisNode()) {
            if (LOGGER.isDebugEnabled()) {
                boolean isPrimaryPollingInstance = this.clusterService.isPrimaryPollingInstance();
                if (this.primaryNodeOnly) {
                    LOGGER.debug("Message source '{}' on flow '{}' running on the primary node is initializing. Note that this Message source must run on the primary node only.", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
                } else {
                    LOGGER.debug("Message source '{}' on flow '{}' is initializing. This {} the primary node of the cluster.", new Object[]{this.sourceModel.getName(), this.getLocation().getRootContainerName(), isPrimaryPollingInstance ? "is" : "is not"});
                }
            }
            ClassUtils.withContextClassLoader((ClassLoader)this.classLoader, () -> {
                this.reallyDoInitialise();
                return null;
            });
        } else {
            LOGGER.debug("Message source '{}' on flow '{}' cannot initialize. This Message source can only run on the primary node of the cluster", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
            new PrimaryNodeLifecycleNotificationListener(() -> {
                LOGGER.debug("Message source '{}' on flow '{}' is initializing because the node became cluster's primary.", (Object)this.sourceModel.getName(), (Object)this.getLocation().getRootContainerName());
                ClassUtils.withContextClassLoader((ClassLoader)this.classLoader, () -> {
                    this.reallyDoInitialise();
                    this.reallyDoStart();
                    return null;
                });
            }, this.notificationListenerRegistry).register();
        }
    }

    protected void reallyDoInitialise() throws InitialisationException {
        try {
            this.lifecycle(() -> this.lifecycleManager.fireInitialisePhase((phase, o) -> {
                this.sourceConnectionManager = new SourceConnectionManager((ConnectionManager)this.connectionManager);
                try {
                    this.createSource(false);
                    LifecycleUtils.initialiseIfNeeded((Object)this.sourceAdapter);
                }
                catch (Exception e) {
                    throw new InitialisationException((Throwable)e, (Initialisable)this);
                }
            }));
        }
        catch (MuleException e) {
            if (e instanceof InitialisationException) {
                InitialisationException initialisationException = (InitialisationException)((Object)e);
                throw initialisationException;
            }
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
    }

    public Map<String, Object> getInitialisationParameters() {
        CoreEvent initialiserEvent = null;
        try {
            initialiserEvent = org.mule.runtime.module.extension.api.util.MuleExtensionUtils.getInitialiserEvent();
            ResolverSet sourceParameters = this.sourceAdapterFactory.getSourceParameters();
            ValueResolvingContext context = ValueResolvingContext.builder(initialiserEvent).withExpressionManager(this.expressionManager).withConfig(this.getConfigurationInstance()).build();
            try {
                ImmutableMap immutableMap = ImmutableMap.copyOf(MuleExtensionUtils.toMap(sourceParameters, context));
                if (context != null) {
                    context.close();
                }
                return immutableMap;
            }
            catch (Throwable throwable) {
                try {
                    if (context != null) {
                        try {
                            context.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage((String)String.format("Could not resolve parameters message source at location '%s'", this.getLocation().toString()), (Object[])new Object[]{e}));
                }
            }
        }
        finally {
            if (initialiserEvent != null) {
                ((BaseEventContext)initialiserEvent.getContext()).success();
            }
        }
    }

    public MessageSource.BackPressureStrategy getBackPressureStrategy() {
        return this.backPressureStrategy;
    }

    public LifecycleState getLifecycleState() {
        return this.lifecycleManager.getState();
    }

    private boolean shouldRunOnThisNode() {
        return !this.primaryNodeOnly || this.clusterService.isPrimaryPollingInstance();
    }

    private Optional<ConfigurationInstance> startUsingConfiguration(CoreEvent event) {
        return this.getConfigurationAndTryToMutateStats(event, MutableConfigurationStats::addRunningSource);
    }

    private void stopUsingConfiguration(CoreEvent event) {
        this.getConfigurationAndTryToMutateStats(event, MutableConfigurationStats::discountRunningSource);
    }

    private Optional<ConfigurationInstance> getConfigurationAndTryToMutateStats(CoreEvent event, Consumer<MutableConfigurationStats> mutableConfigurationStatsConsumer) {
        Optional<ConfigurationInstance> configurationInstanceOptional = this.getConfiguration(event);
        configurationInstanceOptional.ifPresent(configurationInstance -> {
            ConfigurationStats configurationStats = configurationInstance.getStatistics();
            if (configurationStats instanceof MutableConfigurationStats) {
                MutableConfigurationStats mutableConfigurationStats = (MutableConfigurationStats)configurationStats;
                mutableConfigurationStatsConsumer.accept(mutableConfigurationStats);
            }
        });
        return configurationInstanceOptional;
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + ": " + Objects.toString(this.sourceAdapter);
    }

    boolean isReconnecting() {
        return this.reconnecting.get();
    }

    private void invalidateConnection(ConnectionException exception) {
        exception.getConnection().ifPresent(this.sourceConnectionManager::invalidate);
    }

    private void validateConfigurationProviderIsNotExpression() throws InitialisationException {
        if (this.explicitConfigProvider != null && this.expressionParser.isContainsTemplate(this.explicitConfigProvider.getName())) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage((String)String.format("Root component '%s' defines component '%s' which specifies the expression '%s' as a config-ref. Expressions are not allowed as config references", this.getLocation().getRootContainerName(), NameUtils.hyphenize((String)((SourceModel)this.componentModel).getName()), this.explicitConfigProvider)), (Initialisable)this);
        }
    }
}

