/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategyFactory;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.WorkQueueProcessor;
import reactor.util.concurrent.Queues;
import reactor.util.concurrent.WaitStrategy;

abstract class AbstractStreamProcessingStrategyFactory
extends AbstractProcessingStrategyFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamProcessingStrategyFactory.class);
    protected static final String SYSTEM_PROPERTY_PREFIX = AbstractStreamProcessingStrategyFactory.class.getName() + ".";
    protected static final int CORES = Integer.getInteger(SYSTEM_PROPERTY_PREFIX + "AVAILABLE_CORES", Runtime.getRuntime().availableProcessors());
    protected static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(SYSTEM_PROPERTY_PREFIX + "DEFAULT_BUFFER_SIZE", Queues.SMALL_BUFFER_SIZE);
    protected static final int DEFAULT_SUBSCRIBER_COUNT = Integer.getInteger(SYSTEM_PROPERTY_PREFIX + "DEFAULT_SUBSCRIBER_COUNT", Integer.max(1, CORES / 2));
    protected static final String DEFAULT_WAIT_STRATEGY = System.getProperty(SYSTEM_PROPERTY_PREFIX + "DEFAULT_WAIT_STRATEGY", AbstractStreamProcessingStrategy.WaitStrategy.LITE_BLOCKING.name());
    protected static String RING_BUFFER_SCHEDULER_NAME_SUFFIX = ".ring-buffer";
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    private int subscriberCount = DEFAULT_SUBSCRIBER_COUNT;
    private String waitStrategy = DEFAULT_WAIT_STRATEGY;

    AbstractStreamProcessingStrategyFactory() {
    }

    public void setBufferSize(int bufferSize) {
        if (!Queues.isPowerOfTwo(bufferSize)) {
            throw new IllegalArgumentException("bufferSize must be a power of 2 : " + bufferSize);
        }
        this.bufferSize = bufferSize;
    }

    public void setSubscriberCount(int subscriberCount) {
        this.subscriberCount = subscriberCount;
    }

    public void setWaitStrategy(String waitStrategy) {
        this.waitStrategy = waitStrategy;
    }

    protected int getBufferSize() {
        return this.bufferSize;
    }

    protected int getSubscriberCount() {
        return this.subscriberCount;
    }

    protected String getWaitStrategy() {
        return this.waitStrategy;
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return AbstractStreamProcessingStrategy.class;
    }

    protected Supplier<Scheduler> getRingBufferSchedulerSupplier(MuleContext muleContext, String schedulersNamePrefix) {
        return () -> muleContext.getSchedulerService().customScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + RING_BUFFER_SCHEDULER_NAME_SUFFIX).withMaxConcurrentTasks(this.getSubscriberCount()).withWaitAllowed(true));
    }

    static abstract class AbstractStreamProcessingStrategy
    extends AbstractProcessingStrategy {
        protected final Supplier<Scheduler> ringBufferSchedulerSupplier;
        protected final int bufferSize;
        protected final int subscribers;
        protected final WaitStrategy waitStrategy;
        protected final int maxConcurrency;
        private final ClassLoader executionClassloader;

        protected AbstractStreamProcessingStrategy(Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscribers, String waitStrategy, int maxConcurrency) {
            this.subscribers = Objects.requireNonNull(subscribers);
            this.waitStrategy = WaitStrategy.valueOf(waitStrategy);
            this.bufferSize = Objects.requireNonNull(bufferSize);
            this.ringBufferSchedulerSupplier = Objects.requireNonNull(ringBufferSchedulerSupplier);
            this.maxConcurrency = Objects.requireNonNull(maxConcurrency);
            this.executionClassloader = Thread.currentThread().getContextClassLoader();
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor function) {
            long shutdownTimeout = flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
            WorkQueueProcessor processor = WorkQueueProcessor.builder().executor(this.ringBufferSchedulerSupplier.get()).bufferSize(this.bufferSize).waitStrategy(this.waitStrategy.getReactorWaitStrategy()).build();
            int subscriberCount = this.maxConcurrency < this.subscribers ? this.maxConcurrency : this.subscribers;
            CountDownLatch completionLatch = new CountDownLatch(subscriberCount);
            for (int i = 0; i < subscriberCount; ++i) {
                processor.doOnSubscribe(subscription -> Thread.currentThread().setContextClassLoader(this.executionClassloader)).map(ew -> ew.getWrappedEvent()).transform(function).subscribe(null, e -> completionLatch.countDown(), completionLatch::countDown);
            }
            return this.buildSink(processor.sink(), () -> {
                long start = System.currentTimeMillis();
                if (!processor.awaitAndShutdown(Duration.ofMillis(shutdownTimeout))) {
                    LOGGER.warn("WorkQueueProcessor of ProcessingStrategy for flow '{}' not shutDown in {} ms. Forcing shutdown...", (Object)flowConstruct.getName(), (Object)shutdownTimeout);
                    processor.forceShutdown();
                }
                try {
                    if (!completionLatch.await(Long.max(start - System.currentTimeMillis() + shutdownTimeout, 0L), TimeUnit.MILLISECONDS)) {
                        LOGGER.warn("Subscribers of ProcessingStrategy for flow '{}' not completed in {} ms", (Object)flowConstruct.getName(), (Object)shutdownTimeout);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new MuleRuntimeException(e);
                }
            }, this.createOnEventConsumer(), this.bufferSize);
        }

        protected <E> AbstractProcessingStrategy.ReactorSink<E> buildSink(FluxSink<E> fluxSink, Disposable disposable, Consumer<CoreEvent> onEventConsumer, int bufferSize) {
            return new AbstractProcessingStrategy.DefaultReactorSink(fluxSink, disposable, onEventConsumer, bufferSize){

                @Override
                public EventWrapper intoSink(CoreEvent event) {
                    return new EventWrapper(event);
                }
            };
        }

        private static final class EventWrapper {
            CoreEvent wrappedEvent;

            public EventWrapper(CoreEvent event) {
                this.wrappedEvent = event;
                ((BaseEventContext)this.wrappedEvent.getContext()).getRootContext().onTerminated((e, t) -> {
                    this.wrappedEvent = null;
                });
            }

            public CoreEvent getWrappedEvent() {
                return this.wrappedEvent;
            }
        }

        protected static enum WaitStrategy {
            BLOCKING(reactor.util.concurrent.WaitStrategy.blocking()),
            LITE_BLOCKING(reactor.util.concurrent.WaitStrategy.liteBlocking()),
            SLEEPING(reactor.util.concurrent.WaitStrategy.sleeping()),
            BUSY_SPIN(reactor.util.concurrent.WaitStrategy.busySpin()),
            YIELDING(reactor.util.concurrent.WaitStrategy.yielding()),
            PARKING(reactor.util.concurrent.WaitStrategy.parking()),
            PHASED(reactor.util.concurrent.WaitStrategy.phasedOffLiteLock(200L, 100L, TimeUnit.MILLISECONDS));

            private reactor.util.concurrent.WaitStrategy reactorWaitStrategy;

            private WaitStrategy(reactor.util.concurrent.WaitStrategy reactorWaitStrategy) {
                this.reactorWaitStrategy = reactorWaitStrategy;
            }

            reactor.util.concurrent.WaitStrategy getReactorWaitStrategy() {
                return this.reactorWaitStrategy;
            }
        }
    }
}

