/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.source;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.meta.model.ExtensionModel;
import org.mule.runtime.api.meta.model.config.ConfigurationModel;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionType;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.extension.ExtensionManager;
import org.mule.runtime.core.api.lifecycle.LifecycleState;
import org.mule.runtime.core.api.lifecycle.LifecycleStateEnabled;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.retry.RetryCallback;
import org.mule.runtime.core.api.retry.RetryContext;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.streaming.CursorProviderFactory;
import org.mule.runtime.core.api.transaction.MuleTransactionConfig;
import org.mule.runtime.core.api.transaction.TransactionConfig;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.execution.ExceptionCallback;
import org.mule.runtime.core.internal.lifecycle.DefaultLifecycleManager;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.core.privileged.PrivilegedMuleContext;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.core.privileged.execution.MessageProcessContext;
import org.mule.runtime.core.privileged.execution.MessageProcessingManager;
import org.mule.runtime.extension.api.runtime.config.ConfigurationInstance;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.extension.api.runtime.config.ConfiguredComponent;
import org.mule.runtime.extension.api.runtime.source.ParameterizedSource;
import org.mule.runtime.module.extension.internal.runtime.ExtensionComponent;
import org.mule.runtime.module.extension.internal.runtime.exception.ExceptionHandlerManager;
import org.mule.runtime.module.extension.internal.runtime.operation.IllegalSourceException;
import org.mule.runtime.module.extension.internal.runtime.resolver.ObjectBasedParameterValueResolver;
import org.mule.runtime.module.extension.internal.runtime.resolver.ParameterValueResolver;
import org.mule.runtime.module.extension.internal.runtime.source.DefaultSourceCallback;
import org.mule.runtime.module.extension.internal.runtime.source.SourceAdapter;
import org.mule.runtime.module.extension.internal.runtime.source.SourceAdapterFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceCallbackFactory;
import org.mule.runtime.module.extension.internal.runtime.source.SourceConnectionManager;
import org.mule.runtime.module.extension.internal.util.MuleExtensionUtils;
import org.mule.runtime.module.extension.internal.util.ReflectionCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ExtensionMessageSource
extends ExtensionComponent<SourceModel>
implements MessageSource,
ExceptionCallback<ConnectionException>,
ParameterizedSource,
ConfiguredComponent,
LifecycleStateEnabled {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionMessageSource.class);
    @Inject
    private MessageProcessingManager messageProcessingManager;
    @Inject
    private SchedulerService schedulerService;
    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;
    @Inject
    private ReflectionCache reflectionCache;
    private final SourceModel sourceModel;
    private final SourceAdapterFactory sourceAdapterFactory;
    private final boolean primaryNodeOnly;
    private final RetryPolicyTemplate retryPolicyTemplate;
    private final MessageSource.BackPressureStrategy backPressureStrategy;
    private final ExceptionHandlerManager exceptionEnricherManager;
    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
    private final DefaultLifecycleManager<ExtensionMessageSource> lifecycleManager;
    private SourceConnectionManager sourceConnectionManager;
    private Processor messageProcessor;
    private LazyValue<TransactionConfig> transactionConfig = new LazyValue<Supplier<TransactionConfig>>(this::buildTransactionConfig);
    private SourceAdapter sourceAdapter;
    private Scheduler retryScheduler;
    private Scheduler flowTriggerScheduler;
    private AtomicBoolean started = new AtomicBoolean(false);

    public ExtensionMessageSource(ExtensionModel extensionModel, SourceModel sourceModel, SourceAdapterFactory sourceAdapterFactory, ConfigurationProvider configurationProvider, boolean primaryNodeOnly, RetryPolicyTemplate retryPolicyTemplate, CursorProviderFactory cursorProviderFactory, MessageSource.BackPressureStrategy backPressureStrategy, ExtensionManager managerAdapter) {
        super(extensionModel, sourceModel, configurationProvider, cursorProviderFactory, managerAdapter);
        this.sourceModel = sourceModel;
        this.sourceAdapterFactory = sourceAdapterFactory;
        this.retryPolicyTemplate = retryPolicyTemplate;
        this.primaryNodeOnly = primaryNodeOnly;
        this.backPressureStrategy = backPressureStrategy;
        this.exceptionEnricherManager = new ExceptionHandlerManager(extensionModel, sourceModel);
        this.lifecycleManager = new DefaultLifecycleManager<ExtensionMessageSource>(sourceModel.getName(), this);
    }

    private synchronized void createSource() throws Exception {
        if (this.sourceAdapter == null) {
            CoreEvent initialiserEvent = null;
            try {
                initialiserEvent = org.mule.runtime.module.extension.api.util.MuleExtensionUtils.getInitialiserEvent(this.muleContext);
                this.sourceAdapter = this.sourceAdapterFactory.createAdapter(this.getConfiguration(initialiserEvent), this.createSourceCallbackFactory(), this, this.sourceConnectionManager, new MessagingExceptionResolver(this));
                this.muleContext.getInjector().inject(this.sourceAdapter);
            }
            finally {
                if (initialiserEvent != null) {
                    ((BaseEventContext)initialiserEvent.getContext()).success();
                }
            }
        }
    }

    private void startSource() throws MuleException {
        try {
            this.retryPolicyTemplate.execute(new StartSourceCallback(), this.retryScheduler);
        }
        catch (Throwable e) {
            if (e instanceof MuleException) {
                throw (MuleException)e;
            }
            throw new MuleRuntimeException(e);
        }
    }

    private void stopSource() throws MuleException {
        if (this.sourceAdapter != null) {
            try {
                this.sourceAdapter.stop();
            }
            catch (Exception e) {
                throw new DefaultMuleException(String.format("Found exception stopping source '%s' of root component '%s'", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()), (Throwable)e);
            }
        }
    }

    @Override
    public Optional<ConfigurationInstance> getConfigurationInstance() {
        return this.sourceAdapter.getConfigurationInstance();
    }

    private SourceCallbackFactory createSourceCallbackFactory() {
        return completionHandlerFactory -> DefaultSourceCallback.builder().setExceptionCallback(this).setSourceModel(this.sourceModel).setConfigurationInstance(this.getConfigurationInstance().orElse(null)).setTransactionConfig(this.transactionConfig.get()).setSource(this).setListener(this.messageProcessor).setProcessingManager(this.messageProcessingManager).setMuleContext(this.muleContext).setProcessContextSupplier(this::createProcessingContext).setCursorStreamProviderFactory(this.getCursorProviderFactory()).setCompletionHandlerFactory(completionHandlerFactory).build();
    }

    @Override
    public void onException(ConnectionException exception) {
        if (!this.reconnecting.compareAndSet(false, true)) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Message source '%s' on root component '%s' failed to reconnect. Error was: %s", this.sourceModel.getName(), this.getLocation().getRootContainerName(), exception.getMessage())), (Throwable)exception);
        }
        LOGGER.warn(String.format("Message source '%s' on root component '%s' threw exception. Attempting to reconnect...", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()), (Throwable)exception);
        Mono reconnectionAction = this.sourceAdapter.getReconnectionAction(exception).map(p -> Mono.from(this.retryPolicyTemplate.applyPolicy(p))).orElseGet(() -> Mono.create(sink -> {
            try {
                exception.getConnection().ifPresent(this.sourceConnectionManager::invalidate);
                this.restart();
                sink.success();
            }
            catch (Exception e) {
                sink.error(e);
            }
        }));
        reconnectionAction.doOnSuccess(v -> this.onReconnectionSuccessful()).doOnError(this::onReconnectionFailed).doAfterTerminate(() -> this.reconnecting.set(false)).subscribe();
    }

    private void onReconnectionSuccessful() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.warn("Message source '{}' on root component '{}' successfully reconnected", (Object)this.sourceAdapter.getName(), (Object)this.getLocation().getRootContainerName());
        }
    }

    private void onReconnectionFailed(Throwable exception) {
        LOGGER.error(String.format("Message source '%s' on root component '%s' could not be reconnected. Will be shutdown. %s", this.sourceAdapter.getName(), this.getLocation().getRootContainerName(), exception.getMessage()), exception);
        this.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restart() throws MuleException {
        AtomicBoolean atomicBoolean = this.started;
        synchronized (atomicBoolean) {
            if (this.started.get()) {
                this.stopSource();
                this.disposeSource();
                this.startSource();
            } else {
                LOGGER.warn(String.format("Message source '%s' on root component '%s' is stopped. Not doing restart", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()));
            }
        }
    }

    @Override
    public void doStart() throws MuleException {
        if (this.shouldRunOnThisNode()) {
            this.reallyDoStart();
        }
    }

    private void reallyDoStart() throws MuleException {
        this.lifecycle(() -> this.lifecycleManager.fireStartPhase((phase, o) -> {
            LifecycleUtils.startIfNeeded(this.retryPolicyTemplate);
            if (this.retryScheduler == null) {
                this.retryScheduler = this.schedulerService.ioScheduler();
            }
            if (this.flowTriggerScheduler == null) {
                this.flowTriggerScheduler = this.schedulerService.cpuLightScheduler();
            }
            AtomicBoolean atomicBoolean = this.started;
            synchronized (atomicBoolean) {
                this.startSource();
                this.started.set(true);
            }
        }));
    }

    @Override
    public void doStop() throws MuleException {
        this.safeLifecycle(() -> this.lifecycleManager.fireStopPhase((phase, o) -> {
            AtomicBoolean atomicBoolean = this.started;
            synchronized (atomicBoolean) {
                this.started.set(false);
                this.stopSource();
            }
            this.stopSchedulers();
        }));
    }

    @Override
    public void doDispose() {
        try {
            this.safeLifecycle(() -> this.lifecycleManager.fireDisposePhase((phase, o) -> {
                this.disposeSource();
                LifecycleUtils.stopIfNeeded(this.retryPolicyTemplate);
                LifecycleUtils.disposeIfNeeded(this.retryPolicyTemplate, LOGGER);
                this.stopSchedulers();
            }));
        }
        catch (MuleException e) {
            LOGGER.warn(String.format("Failed to dispose message source at root element '%s'. %s", this.getLocation().getRootContainerName(), e.getMessage()), (Throwable)e);
        }
    }

    private void lifecycle(CheckedRunnable runnable) throws MuleException {
        try {
            runnable.run();
        }
        catch (Throwable e) {
            this.handleLifecycleException(e, false);
        }
    }

    private void safeLifecycle(CheckedRunnable runnable) throws MuleException {
        try {
            runnable.run();
        }
        catch (Throwable e) {
            this.handleLifecycleException(e, true);
        }
    }

    private void handleLifecycleException(Throwable e, boolean unwrapLifecycleException) throws MuleException {
        e = Exceptions.unwrap(e);
        if (unwrapLifecycleException && e instanceof LifecycleException && e.getCause() != null) {
            e = e.getCause();
        }
        if (e instanceof IllegalStateException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Skipping lifecycle phase: " + e.getMessage(), e);
            }
        } else {
            if (e instanceof MuleException) {
                throw (MuleException)e;
            }
            throw new DefaultMuleException(e);
        }
    }

    private void shutdown() {
        try {
            LifecycleUtils.stopIfNeeded(this);
        }
        catch (Exception e) {
            LOGGER.error(String.format("Failed to stop source '%s' on flow '%s'", this.sourceAdapter.getName(), this.getLocation().getRootContainerName()), (Throwable)e);
        }
        LifecycleUtils.disposeIfNeeded(this, LOGGER);
    }

    private void stopSchedulers() {
        if (this.retryScheduler != null) {
            try {
                this.retryScheduler.stop();
            }
            finally {
                this.retryScheduler = null;
            }
        }
        if (this.flowTriggerScheduler != null) {
            try {
                this.flowTriggerScheduler.stop();
            }
            finally {
                this.flowTriggerScheduler = null;
            }
        }
    }

    private void disposeSource() {
        LifecycleUtils.disposeIfNeeded(this.sourceAdapter, LOGGER);
        this.sourceAdapter = null;
    }

    private TransactionConfig buildTransactionConfig() {
        MuleTransactionConfig transactionConfig = new MuleTransactionConfig();
        transactionConfig.setAction(MuleExtensionUtils.toActionCode(this.sourceAdapter.getTransactionalAction()));
        transactionConfig.setMuleContext(this.muleContext);
        TransactionType transactionalType = this.sourceAdapter.getTransactionalType();
        transactionConfig.setFactory(this.transactionFactoryLocator.lookUpTransactionFactory(transactionalType).orElseThrow(() -> new IllegalStateException(String.format("Unable to create Source with Transactions of Type: [%s]. No factory available for this transaction type", new Object[]{transactionalType}))));
        return transactionConfig;
    }

    SourceConnectionManager getSourceConnectionManager() {
        return this.sourceConnectionManager;
    }

    private MessageProcessContext createProcessingContext() {
        return new MessageProcessContext(){

            @Override
            public boolean supportsAsynchronousProcessing() {
                return true;
            }

            @Override
            public MessageSource getMessageSource() {
                return ExtensionMessageSource.this;
            }

            @Override
            public Scheduler getFlowExecutionExecutor() {
                return ExtensionMessageSource.this.flowTriggerScheduler;
            }

            @Override
            public Optional<TransactionConfig> getTransactionConfig() {
                return ExtensionMessageSource.this.sourceModel.isTransactional() ? Optional.of(ExtensionMessageSource.this.transactionConfig.get()) : Optional.empty();
            }

            @Override
            public ClassLoader getExecutionClassLoader() {
                return ExtensionMessageSource.this.muleContext.getExecutionClassLoader();
            }

            @Override
            public ErrorTypeLocator getErrorTypeLocator() {
                return ((PrivilegedMuleContext)ExtensionMessageSource.this.muleContext).getErrorTypeLocator();
            }
        };
    }

    @Override
    public void setListener(Processor listener) {
        this.messageProcessor = listener;
    }

    @Override
    protected void validateOperationConfiguration(ConfigurationProvider configurationProvider) {
        ConfigurationModel configurationModel = configurationProvider.getConfigurationModel();
        if (!configurationModel.getSourceModel(this.sourceModel.getName()).isPresent() && !configurationProvider.getExtensionModel().getSourceModel(this.sourceModel.getName()).isPresent()) {
            throw new IllegalSourceException(String.format("Root component '%s' defines an usage of operation '%s' which points to configuration '%s'. The selected config does not support that operation.", this.getLocation().getRootContainerName(), this.sourceModel.getName(), configurationProvider.getName()));
        }
    }

    @Override
    protected ParameterValueResolver getParameterValueResolver() {
        return new ObjectBasedParameterValueResolver(this.sourceAdapter.getDelegate(), this.sourceModel, this.reflectionCache);
    }

    @Override
    protected void doInitialise() throws InitialisationException {
        if (this.shouldRunOnThisNode()) {
            this.reallyDoInitialise();
        } else {
            new PrimaryNodeLifecycleNotificationListener(() -> {
                this.reallyDoInitialise();
                this.reallyDoStart();
            }, this.notificationListenerRegistry).register();
        }
    }

    private void reallyDoInitialise() throws InitialisationException {
        try {
            this.lifecycle(() -> this.lifecycleManager.fireInitialisePhase((phase, o) -> {
                LifecycleUtils.initialiseIfNeeded(this.retryPolicyTemplate, true, this.muleContext);
                this.sourceConnectionManager = new SourceConnectionManager(this.connectionManager);
                try {
                    this.createSource();
                }
                catch (Exception e) {
                    throw new InitialisationException((Throwable)e, (Initialisable)this);
                }
            }));
        }
        catch (MuleException e) {
            if (e instanceof InitialisationException) {
                throw (InitialisationException)e;
            }
            throw new InitialisationException((Throwable)e, (Initialisable)this);
        }
    }

    @Override
    public Map<String, Object> getInitialisationParameters() {
        CoreEvent initialiserEvent = null;
        try {
            initialiserEvent = org.mule.runtime.module.extension.api.util.MuleExtensionUtils.getInitialiserEvent();
            ImmutableMap immutableMap = ImmutableMap.copyOf(MuleExtensionUtils.toMap(this.sourceAdapterFactory.getSourceParameters(), initialiserEvent));
            return immutableMap;
        }
        catch (Exception e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Could not resolve parameters message source at location '%s'", this.getLocation().toString()), e));
        }
        finally {
            if (initialiserEvent != null) {
                ((BaseEventContext)initialiserEvent.getContext()).success();
            }
        }
    }

    @Override
    public MessageSource.BackPressureStrategy getBackPressureStrategy() {
        return this.backPressureStrategy;
    }

    private boolean shouldRunOnThisNode() {
        return this.primaryNodeOnly ? this.muleContext.isPrimaryPollingInstance() : true;
    }

    @Override
    public LifecycleState getLifecycleState() {
        return this.lifecycleManager.getState();
    }

    private class StartSourceCallback
    implements RetryCallback {
        private StartSourceCallback() {
        }

        @Override
        public void doWork(RetryContext context) throws Exception {
            try {
                ExtensionMessageSource.this.createSource();
                LifecycleUtils.initialiseIfNeeded(ExtensionMessageSource.this.sourceAdapter, ExtensionMessageSource.this.muleContext);
                ExtensionMessageSource.this.sourceAdapter.start();
                ExtensionMessageSource.this.reconnecting.set(false);
            }
            catch (Exception e) {
                ExtensionMessageSource.this.stopSource();
                ExtensionMessageSource.this.disposeSource();
                Throwable throwable = ExtensionMessageSource.this.exceptionEnricherManager.process(e);
                Optional<ConnectionException> connectionException = ExceptionUtils.extractConnectionException(throwable);
                if (connectionException.isPresent()) {
                    throwable = connectionException.get();
                }
                throw throwable instanceof Exception ? (Exception)throwable : new MuleRuntimeException(throwable);
            }
        }

        @Override
        public String getWorkDescription() {
            return "Message Source Reconnection";
        }

        @Override
        public Object getWorkOwner() {
            return ExtensionMessageSource.this;
        }
    }
}

