/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.DirectSink;
import org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyAdapter;
import org.mule.runtime.core.privileged.exception.MessagingException;
import reactor.core.publisher.FluxSink;
import reactor.util.concurrent.Queues;

public abstract class AbstractProcessingStrategy
implements ProcessingStrategyAdapter {
    public static final String TRANSACTIONAL_ERROR_MESSAGE = "Unable to process a transactional flow asynchronously";
    public static final String PROCESSOR_SCHEDULER_CONTEXT_KEY = "mule.nb.processorScheduler";
    protected static final long SCHEDULER_BUSY_RETRY_INTERVAL_MS = 2L;
    private UnaryOperator<ScheduledExecutorService> schedulerDecorator = UnaryOperator.identity();
    protected Consumer<CoreEvent> onEventConsumer = this.createDefaultOnEventConsumer();

    @Override
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
        return new DirectSink(pipeline, this.createDefaultOnEventConsumer(), Queues.SMALL_BUFFER_SIZE);
    }

    @Override
    public void setOnEventConsumer(Consumer<CoreEvent> onEventConsumer) {
        this.onEventConsumer = onEventConsumer;
    }

    protected Consumer<CoreEvent> createDefaultOnEventConsumer() {
        return event -> {
            if (TransactionCoordination.isTransactionActive()) {
                throw Exceptions.propagateWrappingFatal(new MessagingException((CoreEvent)event, (Throwable)new DefaultMuleException(I18nMessageFactory.createStaticMessage(TRANSACTIONAL_ERROR_MESSAGE))));
            }
        };
    }

    protected ScheduledExecutorService decorateScheduler(ScheduledExecutorService scheduler) {
        return (ScheduledExecutorService)this.schedulerDecorator.apply(scheduler);
    }

    @Override
    public UnaryOperator<ScheduledExecutorService> getSchedulerDecorator() {
        return this.schedulerDecorator;
    }

    @Override
    public void setSchedulerDecorator(UnaryOperator<ScheduledExecutorService> schedulerDecorator) {
        this.schedulerDecorator = schedulerDecorator;
    }

    static class DefaultReactorSink<E>
    implements ReactorSink<E> {
        private final FluxSink<E> fluxSink;
        private final Consumer<Long> disposer;
        private final Consumer<CoreEvent> onEventConsumer;
        private final int bufferSize;
        private long prepareDisposeTimestamp = -1L;

        DefaultReactorSink(FluxSink<E> fluxSink, Consumer<Long> disposer, Consumer<CoreEvent> onEventConsumer, int bufferSize) {
            this.fluxSink = fluxSink;
            this.disposer = disposer;
            this.onEventConsumer = onEventConsumer;
            this.bufferSize = bufferSize;
        }

        @Override
        public final void accept(CoreEvent event) {
            this.onEventConsumer.accept(event);
            this.fluxSink.next(this.intoSink(event));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final BackPressureReason emit(CoreEvent event) {
            this.onEventConsumer.accept(event);
            long remainingCapacity = this.fluxSink.requestedFromDownstream();
            if (remainingCapacity == 0L) {
                return BackPressureReason.EVENTS_ACCUMULATED;
            }
            if (remainingCapacity > (long)(this.bufferSize > AbstractStreamProcessingStrategyFactory.CORES * 4 ? AbstractStreamProcessingStrategyFactory.CORES : 0)) {
                this.fluxSink.next(this.intoSink(event));
                return null;
            }
            FluxSink<E> fluxSink = this.fluxSink;
            synchronized (fluxSink) {
                if (remainingCapacity > 0L) {
                    this.fluxSink.next(this.intoSink(event));
                    return null;
                }
                return BackPressureReason.EVENTS_ACCUMULATED;
            }
        }

        @Override
        public E intoSink(CoreEvent event) {
            return (E)event;
        }

        @Override
        public void prepareDispose() {
            this.prepareDisposeTimestamp = System.currentTimeMillis();
            this.fluxSink.complete();
        }

        @Override
        public final void dispose() {
            if (this.prepareDisposeTimestamp == -1L) {
                this.fluxSink.complete();
                this.disposer.accept(System.currentTimeMillis());
            } else {
                this.disposer.accept(this.prepareDisposeTimestamp);
            }
        }
    }

    static interface ReactorSink<E>
    extends Sink,
    Disposable {
        public E intoSink(CoreEvent var1);

        public void prepareDispose();
    }
}

