/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

public class StreamPerThreadSink
implements Sink,
Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamPerThreadSink.class);
    private static final int THREAD_CACHE_TIME_LIMIT_IN_MINUTES = 60;
    private static final int TRANSACTION_CACHE_TIME_LIMIT_IN_MINUTES = 10;
    private final ReactiveProcessor processor;
    private final Consumer<CoreEvent> eventConsumer;
    private final FlowConstruct flowConstruct;
    private volatile boolean disposing = false;
    private final Cache<Thread, FluxSink<CoreEvent>> sinks = Caffeine.newBuilder().weakKeys().removalListener((thread, coreEventFluxSink, removalCause) -> coreEventFluxSink.complete()).expireAfterAccess(60L, TimeUnit.MINUTES).build();
    private final Cache<Transaction, FluxSink<CoreEvent>> sinksNestedTx = Caffeine.newBuilder().weakKeys().removalListener((transaction, coreEventFluxSink, removalCause) -> coreEventFluxSink.complete()).expireAfterAccess(10L, TimeUnit.MINUTES).build();
    private AtomicLong disposableSinks = new AtomicLong();

    public StreamPerThreadSink(ReactiveProcessor processor, Consumer<CoreEvent> eventConsumer, FlowConstruct flowConstruct) {
        this.processor = processor;
        this.eventConsumer = eventConsumer;
        this.flowConstruct = flowConstruct;
    }

    private FluxSink<CoreEvent> createSink() {
        this.disposableSinks.incrementAndGet();
        FluxSinkRecorder recorder = new FluxSinkRecorder();
        recorder.flux().doOnNext(request -> this.eventConsumer.accept((CoreEvent)request)).transform((Function)this.processor).subscribe(null, e -> {
            LOGGER.error("Exception reached PS subscriber for flow '" + this.flowConstruct.getName() + "'", e);
            this.disposableSinks.decrementAndGet();
        }, () -> this.disposableSinks.decrementAndGet());
        return recorder.getFluxSink();
    }

    @Override
    public void accept(CoreEvent event) {
        if (this.disposing) {
            throw new IllegalStateException("Already disposed");
        }
        TransactionCoordination txCoord = TransactionCoordination.getInstance();
        if (txCoord.runningNestedTransaction()) {
            ((FluxSink)this.sinksNestedTx.get((Object)txCoord.getTransaction(), tx -> this.createSink())).next((Object)event);
        } else {
            ((FluxSink)this.sinks.get((Object)Thread.currentThread(), t -> this.createSink())).next((Object)event);
        }
    }

    @Override
    public BackPressureReason emit(CoreEvent event) {
        this.accept(event);
        return null;
    }

    @Override
    public void dispose() {
        this.disposing = true;
        this.sinks.asMap().values().forEach(sink -> sink.complete());
        this.sinksNestedTx.asMap().values().forEach(sink -> sink.complete());
        long shutdownTimeout = this.flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
        long startMillis = System.currentTimeMillis();
        while (this.disposableSinks.get() != 0L && System.currentTimeMillis() <= shutdownTimeout + startMillis && !Thread.currentThread().isInterrupted()) {
            Thread.yield();
        }
        if (Thread.currentThread().isInterrupted()) {
            if (System.getProperty("mule.lifecycle.failOnFirstDisposeError") != null) {
                throw new IllegalStateException(String.format("TX Subscribers of ProcessingStrategy for flow '%s' not completed before thread interruption", this.flowConstruct.getName()));
            }
            LOGGER.warn("TX Subscribers of ProcessingStrategy for flow '{}' not completed before thread interruption", (Object)this.flowConstruct.getName());
            this.sinks.invalidateAll();
            this.sinksNestedTx.invalidateAll();
        } else if (this.disposableSinks.get() != 0L) {
            if (System.getProperty("mule.lifecycle.failOnFirstDisposeError") != null) {
                throw new IllegalStateException(String.format("TX Subscribers of ProcessingStrategy for flow '%s' not completed in %d ms", this.flowConstruct.getName(), shutdownTimeout));
            }
            LOGGER.warn("TX Subscribers of ProcessingStrategy for flow '{}' not completed in {} ms", (Object)this.flowConstruct.getName(), (Object)shutdownTimeout);
            this.sinks.invalidateAll();
            this.sinksNestedTx.invalidateAll();
        }
    }
}

