/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.AbstractStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.ReactorStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.rx.RetrySchedulerWrapper;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.retry.BackoffDelay;
import reactor.retry.Retry;

public abstract class ProactorStreamProcessingStrategy
extends ReactorStreamProcessingStrategyFactory.ReactorStreamProcessingStrategy {
    protected static final int STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD = Integer.getInteger(AbstractStreamProcessingStrategyFactory.SYSTEM_PROPERTY_PREFIX + "STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD", DataUnit.KB.toBytes(16));
    private static Logger LOGGER = LoggerFactory.getLogger(ProactorStreamProcessingStrategy.class);
    private static long SCHEDULER_BUSY_RETRY_INTERVAL_MS = 2L;
    private static long SCHEDULER_BUSY_RETRY_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(SCHEDULER_BUSY_RETRY_INTERVAL_MS);
    private final Supplier<Scheduler> blockingSchedulerSupplier;
    private final Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
    private Scheduler blockingScheduler;
    private Scheduler cpuIntensiveScheduler;
    private final AtomicLong lastRetryTimestamp = new AtomicLong(Long.MIN_VALUE);
    private final AtomicInteger inFlightEvents = new AtomicInteger();
    private final BiConsumer<CoreEvent, Throwable> IN_FLIGHT_DECREMENT_CALLBACK = (e, t) -> this.inFlightEvents.decrementAndGet();
    private final LongUnaryOperator LAST_RETRY_TIMESTAMP_CHECK_OPERATOR = v -> System.nanoTime() - v < SCHEDULER_BUSY_RETRY_INTERVAL_NS * 2L ? v : Long.MIN_VALUE;

    public ProactorStreamProcessingStrategy(Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscriberCount, String waitStrategy, Supplier<Scheduler> cpuLightSchedulerSupplier, Supplier<Scheduler> blockingSchedulerSupplier, Supplier<Scheduler> cpuIntensiveSchedulerSupplier, int parallelism, int maxConcurrency, boolean maxConcurrencyEagerCheck) {
        super(ringBufferSchedulerSupplier, bufferSize, subscriberCount, waitStrategy, cpuLightSchedulerSupplier, parallelism, maxConcurrency, maxConcurrencyEagerCheck);
        this.blockingSchedulerSupplier = blockingSchedulerSupplier;
        this.cpuIntensiveSchedulerSupplier = cpuIntensiveSchedulerSupplier;
    }

    @Override
    public void start() throws MuleException {
        super.start();
        this.cpuLightScheduler = new RetrySchedulerWrapper(this.cpuLightScheduler, SCHEDULER_BUSY_RETRY_INTERVAL_MS, () -> this.lastRetryTimestamp.set(System.nanoTime()));
        this.blockingScheduler = this.blockingSchedulerSupplier.get();
        this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
    }

    @Override
    public void stop() throws MuleException {
        if (this.blockingScheduler != null) {
            this.blockingScheduler.stop();
        }
        if (this.cpuIntensiveScheduler != null) {
            this.cpuIntensiveScheduler.stop();
        }
        super.stop();
    }

    @Override
    public ReactiveProcessor onProcessor(ReactiveProcessor processor) {
        if (processor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING || processor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW) {
            return this.proactor(processor, this.blockingScheduler);
        }
        if (processor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE) {
            return this.proactor(processor, this.cpuIntensiveScheduler);
        }
        return super.onProcessor(processor);
    }

    private ReactiveProcessor proactor(ReactiveProcessor processor, Scheduler scheduler) {
        return publisher -> Flux.from(publisher).flatMap(event -> {
            if (processor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW && !this.scheduleIoRwEvent((CoreEvent)event)) {
                return Flux.just(event).transform(processor).subscriberContext(ctx -> ctx.put("mule.nb.processorScheduler", this.getCpuLightScheduler()));
            }
            return this.withRetry(this.scheduleProcessor(processor, scheduler, (CoreEvent)event).subscriberContext(ctx -> ctx.put("mule.nb.processorScheduler", scheduler)), scheduler);
        }, Math.max(this.maxConcurrency / (this.getParallelism() * this.subscribers), 1));
    }

    protected boolean scheduleIoRwEvent(CoreEvent event) {
        return event.getMessage().getPayload().getDataType().isStreamType() && event.getMessage().getPayload().getByteLength().orElse(Long.MAX_VALUE) > (long)STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD;
    }

    protected abstract Flux<CoreEvent> scheduleProcessor(ReactiveProcessor var1, Scheduler var2, CoreEvent var3);

    private Flux<CoreEvent> withRetry(Flux<CoreEvent> scheduledFlux, Scheduler processorScheduler) {
        return scheduledFlux.retryWhen((Function<Flux<Throwable>, Publisher<?>>)Retry.onlyIf(ctx -> {
            boolean schedulerBusy = this.isSchedulerBusy(ctx.exception());
            if (schedulerBusy) {
                LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", (Object)processorScheduler.getName(), (Object)SCHEDULER_BUSY_RETRY_INTERVAL_MS);
                this.lastRetryTimestamp.set(System.nanoTime());
            }
            return schedulerBusy;
        }).backoff(ctx -> new BackoffDelay(Duration.ofMillis(SCHEDULER_BUSY_RETRY_INTERVAL_MS))).withBackoffScheduler(Schedulers.fromExecutorService(this.decorateScheduler(this.getCpuLightScheduler()))));
    }

    protected Scheduler getBlockingScheduler() {
        return this.blockingScheduler;
    }

    protected Scheduler getCpuIntensiveScheduler() {
        return this.cpuIntensiveScheduler;
    }

    protected final class ProactorSinkWrapper<E>
    implements AbstractProcessingStrategy.ReactorSink<E> {
        private final AbstractProcessingStrategy.ReactorSink<E> innerSink;

        protected ProactorSinkWrapper(AbstractProcessingStrategy.ReactorSink<E> innerSink) {
            this.innerSink = innerSink;
        }

        @Override
        public final void accept(CoreEvent event) {
            if (!this.checkCapacity(event)) {
                throw new RejectedExecutionException();
            }
            this.innerSink.accept(event);
        }

        @Override
        public final boolean emit(CoreEvent event) {
            return this.checkCapacity(event) && this.innerSink.emit(event);
        }

        private boolean checkCapacity(CoreEvent event) {
            if (ProactorStreamProcessingStrategy.this.lastRetryTimestamp.get() != Long.MIN_VALUE && ProactorStreamProcessingStrategy.this.lastRetryTimestamp.updateAndGet(ProactorStreamProcessingStrategy.this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR) != Long.MIN_VALUE) {
                return false;
            }
            if (ProactorStreamProcessingStrategy.this.maxConcurrencyEagerCheck) {
                if (ProactorStreamProcessingStrategy.this.inFlightEvents.incrementAndGet() > ProactorStreamProcessingStrategy.this.maxConcurrency) {
                    ProactorStreamProcessingStrategy.this.inFlightEvents.decrementAndGet();
                    return false;
                }
                ((BaseEventContext)event.getContext()).onResponse(ProactorStreamProcessingStrategy.this.IN_FLIGHT_DECREMENT_CALLBACK);
            }
            return true;
        }

        @Override
        public E intoSink(CoreEvent event) {
            return this.innerSink.intoSink(event);
        }

        @Override
        public final void dispose() {
            this.innerSink.dispose();
        }
    }
}

