/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractAsynchronousPublisherOperator;
import io.servicetalk.concurrent.api.CancellableSet;
import io.servicetalk.concurrent.api.CompositeExceptionUtils;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SubscriberApiUtils;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PublisherFlatMapMerge<T, R>
extends AbstractAsynchronousPublisherOperator<T, R> {
    static final int FLAT_MAP_DEFAULT_CONCURRENCY = 16;
    private static final Logger LOGGER = LoggerFactory.getLogger(PublisherFlatMapMerge.class);
    private static final int MIN_MAPPED_DEMAND = 1;
    private final Function<? super T, ? extends Publisher<? extends R>> mapper;
    private final int maxConcurrency;
    private final int maxDelayedErrors;

    PublisherFlatMapMerge(Publisher<T> original, Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError) {
        this(original, mapper, delayError, 16);
    }

    PublisherFlatMapMerge(Publisher<T> original, Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency) {
        this(original, mapper, CompositeExceptionUtils.maxDelayedErrors(delayError), maxConcurrency);
    }

    PublisherFlatMapMerge(Publisher<T> original, Function<? super T, ? extends Publisher<? extends R>> mapper, int maxDelayedErrors, int maxConcurrency) {
        super(original);
        if (maxConcurrency <= 0) {
            throw new IllegalArgumentException("maxConcurrency: " + maxConcurrency + " (expected >0)");
        }
        if (maxDelayedErrors < 0) {
            throw new IllegalArgumentException("maxDelayedErrors: " + maxDelayedErrors + " (expected >=0)");
        }
        this.mapper = Objects.requireNonNull(mapper);
        this.maxConcurrency = maxConcurrency;
        this.maxDelayedErrors = maxDelayedErrors;
    }

    @Override
    public PublisherSource.Subscriber<? super T> apply(PublisherSource.Subscriber<? super R> subscriber) {
        return new FlatMapSubscriber(this, subscriber);
    }

    private static final class FlatMapSubscriber<T, R>
    implements PublisherSource.Subscriber<T>,
    PublisherSource.Subscription {
        private static final Object MAPPED_SOURCE_COMPLETE = new Object();
        private static final AtomicReferenceFieldUpdater<FlatMapSubscriber, Throwable> pendingErrorUpdater = AtomicReferenceFieldUpdater.newUpdater(FlatMapSubscriber.class, Throwable.class, "pendingError");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> pendingErrorCountUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "pendingErrorCount");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> emittingLockUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "emittingLock");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> mappedDemandUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "mappedDemand");
        private static final AtomicLongFieldUpdater<FlatMapSubscriber> pendingDemandUpdater = AtomicLongFieldUpdater.newUpdater(FlatMapSubscriber.class, "pendingDemand");
        private static final AtomicIntegerFieldUpdater<FlatMapSubscriber> activeMappedSourcesUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapSubscriber.class, "activeMappedSources");
        @Nullable
        private volatile Throwable pendingError;
        private volatile int pendingErrorCount;
        private volatile int emittingLock;
        private volatile int activeMappedSources;
        private volatile long pendingDemand;
        private volatile long mappedDemand;
        private boolean targetTerminated;
        @Nullable
        private Throwable upstreamError;
        @Nullable
        private PublisherSource.Subscription subscription;
        private final PublisherSource.Subscriber<? super R> target;
        private final Queue<Object> signals;
        private final PublisherFlatMapMerge<T, R> source;
        private final CancellableSet cancellableSet;

        FlatMapSubscriber(PublisherFlatMapMerge<T, R> source, PublisherSource.Subscriber<? super R> target) {
            this.source = source;
            this.target = target;
            this.signals = PlatformDependent.newUnboundedMpscQueue((int)4);
            this.cancellableSet = new CancellableSet(Math.min(16, ((PublisherFlatMapMerge)source).maxConcurrency));
        }

        public void cancel() {
            this.pendingDemand = -1L;
            this.doCancel(true);
        }

        public void request(long n) {
            assert (this.subscription != null);
            if (SubscriberUtils.isRequestNValid((long)n)) {
                if (pendingDemandUpdater.getAndAccumulate(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative) == 0L) {
                    this.drainPending();
                }
                this.incMappedDemand(n);
            } else {
                this.subscription.request(n);
                this.enqueueAndDrain(TerminalNotification.error((Throwable)SubscriberUtils.newExceptionForInvalidRequestN((long)n)));
            }
        }

        public void onSubscribe(PublisherSource.Subscription s) {
            if (!SubscriberUtils.checkDuplicateSubscription((PublisherSource.Subscription)this.subscription, (PublisherSource.Subscription)s)) {
                return;
            }
            this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)s);
            this.target.onSubscribe((PublisherSource.Subscription)this);
            this.subscription.request((long)((PublisherFlatMapMerge)this.source).maxConcurrency);
        }

        public void onNext(@Nullable T t) {
            int currValue;
            Publisher publisher = (Publisher)Objects.requireNonNull(((PublisherFlatMapMerge)this.source).mapper.apply(t), () -> "Mapper " + ((PublisherFlatMapMerge)this.source).mapper + " returned null");
            do {
                if ((currValue = this.activeMappedSources) < 0) {
                    throw new IllegalStateException("onNext(" + t + ") after terminal signal delivered to " + this);
                }
                if (currValue != Integer.MAX_VALUE) continue;
                throw new IllegalStateException("Overflow of mapped Publishers for " + this);
            } while (!activeMappedSourcesUpdater.compareAndSet(this, currValue, currValue + 1));
            publisher.subscribeInternal(new FlatMapPublisherSubscriber(this));
        }

        public void onError(Throwable t) {
            this.upstreamError = t;
            try {
                this.doCancel(false);
            }
            finally {
                this.enqueueAndDrain(TerminalNotification.error((Throwable)t));
            }
        }

        public void onComplete() {
            if (this.terminateActiveMappedSources()) {
                this.enqueueAndDrain(TerminalNotification.complete());
            }
        }

        private void incMappedDemand(long n) {
            assert (n > 0L);
            mappedDemandUpdater.getAndAccumulate(this, n, FlowControlUtils::addWithUnderOverflowProtection);
        }

        private int reserveMappedDemandQuota() {
            int quota;
            while (true) {
                long prevDemand;
                if ((prevDemand = this.mappedDemand) <= 0L) {
                    if (!mappedDemandUpdater.compareAndSet(this, prevDemand, prevDemand - 1L)) continue;
                    return 1;
                }
                quota = this.calculateRequestNQuota(prevDemand);
                if (mappedDemandUpdater.compareAndSet(this, prevDemand, prevDemand - (long)quota)) break;
            }
            return quota;
        }

        private void distributeMappedDemand(FlatMapPublisherSubscriber<T, R> hungrySubscriber) {
            int quota = this.reserveMappedDemandQuota();
            if (!hungrySubscriber.request(quota)) {
                this.incMappedDemand(quota);
            }
        }

        private int calculateRequestNQuota(long availableRequestN) {
            return (int)Math.min(Integer.MAX_VALUE, Math.max(availableRequestN / (long)((PublisherFlatMapMerge)this.source).maxConcurrency, 1L));
        }

        private void doCancel(boolean cancelUpstream) {
            try {
                if (cancelUpstream) {
                    assert (this.subscription != null);
                    this.subscription.cancel();
                }
            }
            finally {
                this.cancellableSet.cancel();
            }
        }

        private boolean tryDecrementPendingDemand() {
            long prevDemand;
            do {
                if ((prevDemand = this.pendingDemand) > 0L) continue;
                return false;
            } while (!pendingDemandUpdater.compareAndSet(this, prevDemand, prevDemand - 1L));
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void tryEmitItem(Object item, boolean needsDemand, FlatMapPublisherSubscriber<T, R> subscriber) {
            block13: {
                if (subscriber.hasSignalsQueued() || needsDemand && !this.tryDecrementPendingDemand()) {
                    subscriber.markSignalsQueued();
                    this.enqueueAndDrain(item);
                } else if (item == MAPPED_SOURCE_COMPLETE) {
                    try {
                        this.requestMoreFromUpstream(1);
                    }
                    catch (Throwable cause) {
                        this.onErrorNotHoldingLock(cause);
                    }
                } else {
                    if (ConcurrentUtils.tryAcquireLock(emittingLockUpdater, (Object)this)) {
                        try {
                            boolean demandConsumed = this.sendToTarget(item);
                            assert (demandConsumed == needsDemand || this.targetTerminated);
                            break block13;
                        }
                        finally {
                            if (!ConcurrentUtils.releaseLock(emittingLockUpdater, (Object)this)) {
                                this.drainPending();
                            }
                        }
                    }
                    if (needsDemand) {
                        pendingDemandUpdater.getAndAccumulate(this, 1L, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                    }
                    subscriber.markSignalsQueued();
                    this.enqueueAndDrain(item);
                }
            }
        }

        private void onErrorHoldingLock(Throwable cause) {
            try {
                this.doCancel(true);
            }
            finally {
                this.sendToTarget(TerminalNotification.error((Throwable)cause));
            }
        }

        private void onErrorNotHoldingLock(Throwable cause) {
            if (ConcurrentUtils.tryAcquireLock(emittingLockUpdater, (Object)this)) {
                this.onErrorHoldingLock(cause);
            } else {
                try {
                    this.doCancel(true);
                }
                finally {
                    this.enqueueAndDrain(TerminalNotification.error((Throwable)cause));
                }
            }
        }

        private void enqueueItem(Object item) {
            if (!this.signals.offer(item)) {
                FlatMapSubscriber.enqueueFailed(item);
            }
        }

        private void enqueueAndDrain(Object item) {
            this.enqueueItem(item);
            this.drainPending();
        }

        private static void enqueueFailed(Object item) {
            LOGGER.error("Queue should be unbounded, but an offer failed for item {}!", item);
            throw new QueueFullException("pending");
        }

        private void drainPending() {
            boolean tryAcquire = true;
            while (tryAcquire && ConcurrentUtils.tryAcquireLock(emittingLockUpdater, (Object)this)) {
                try {
                    int mappedSourcesCompleted = 0;
                    long prevDemand = pendingDemandUpdater.getAndSet(this, 0L);
                    if (prevDemand < 0L) {
                        this.pendingDemand = prevDemand;
                    } else {
                        Object t;
                        long emittedCount = 0L;
                        while (emittedCount < prevDemand && (t = this.signals.poll()) != null) {
                            if (t == MAPPED_SOURCE_COMPLETE) {
                                ++mappedSourcesCompleted;
                                continue;
                            }
                            if (!this.sendToTarget(t)) continue;
                            ++emittedCount;
                        }
                        if (emittedCount == prevDemand) {
                            while (true) {
                                if ((t = this.signals.peek()) == MAPPED_SOURCE_COMPLETE) {
                                    this.signals.poll();
                                    ++mappedSourcesCompleted;
                                    continue;
                                }
                                if (!(t instanceof FlatMapPublisherSubscriber)) break;
                                this.signals.poll();
                                FlatMapPublisherSubscriber hungrySubscriber = (FlatMapPublisherSubscriber)t;
                                this.distributeMappedDemand(hungrySubscriber);
                            }
                            if (t instanceof TerminalNotification) {
                                this.sendToTarget(t);
                            } else {
                                this.sendToTargetIfPrematureError();
                            }
                        } else {
                            assert (emittedCount < prevDemand);
                            pendingDemandUpdater.accumulateAndGet(this, prevDemand - emittedCount, FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                        }
                    }
                    if (mappedSourcesCompleted != 0) {
                        this.requestMoreFromUpstream(mappedSourcesCompleted);
                    }
                }
                catch (Throwable cause) {
                    this.onErrorHoldingLock(cause);
                    return;
                }
                tryAcquire = !ConcurrentUtils.releaseLock(emittingLockUpdater, (Object)this);
            }
        }

        private void requestMoreFromUpstream(int mappedSourcesCompleted) {
            assert (mappedSourcesCompleted > 0);
            assert (this.subscription != null);
            this.subscription.request((long)mappedSourcesCompleted);
        }

        private boolean sendToTarget(Object item) {
            assert (item != MAPPED_SOURCE_COMPLETE);
            if (this.targetTerminated) {
                return false;
            }
            if (item instanceof TerminalNotification) {
                this.signals.clear();
                this.targetTerminated = true;
                Throwable currPendingError = this.pendingError;
                if (currPendingError != null) {
                    this.target.onError(currPendingError);
                } else {
                    ((TerminalNotification)item).terminate(this.target);
                }
                return false;
            }
            if (item instanceof FlatMapPublisherSubscriber) {
                FlatMapPublisherSubscriber hungrySubscriber = (FlatMapPublisherSubscriber)item;
                this.distributeMappedDemand(hungrySubscriber);
                return false;
            }
            this.target.onNext(SubscriberApiUtils.unwrapNullUnchecked(item));
            return true;
        }

        private void sendToTargetIfPrematureError() {
            if (this.upstreamError != null && !this.targetTerminated) {
                this.signals.clear();
                this.targetTerminated = true;
                this.target.onError(this.upstreamError);
            }
        }

        private boolean terminateActiveMappedSources() {
            int prevActiveSources;
            do {
                prevActiveSources = this.activeMappedSources;
                assert (prevActiveSources >= 0);
            } while (!activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, -prevActiveSources));
            return prevActiveSources == 0;
        }

        private boolean decrementActiveMappedSources() {
            int prevActiveSources;
            while (true) {
                prevActiveSources = this.activeMappedSources;
                assert (prevActiveSources != 0);
                if (prevActiveSources > 0) {
                    if (!activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, prevActiveSources - 1)) continue;
                    return false;
                }
                if (activeMappedSourcesUpdater.compareAndSet(this, prevActiveSources, prevActiveSources + 1)) break;
            }
            return prevActiveSources == -1;
        }

        private boolean removeSubscriber(FlatMapPublisherSubscriber<T, R> subscriber, int unusedDemand) {
            assert (((FlatMapPublisherSubscriber)subscriber).subscription != null);
            if (this.cancellableSet.remove((Cancellable)((FlatMapPublisherSubscriber)subscriber).subscription) && this.decrementActiveMappedSources()) {
                return true;
            }
            if (unusedDemand > 0) {
                this.incMappedDemand(unusedDemand);
            }
            return false;
        }

        private static final class FlatMapPublisherSubscriber<T, R>
        implements PublisherSource.Subscriber<R> {
            private static final AtomicIntegerFieldUpdater<FlatMapPublisherSubscriber> pendingDemandUpdater = AtomicIntegerFieldUpdater.newUpdater(FlatMapPublisherSubscriber.class, "innerPendingDemand");
            private static final int TERMINATED = -2;
            private final FlatMapSubscriber<T, R> parent;
            private volatile int innerPendingDemand;
            @Nullable
            private PublisherSource.Subscription subscription;
            private boolean signalsQueued;

            FlatMapPublisherSubscriber(FlatMapSubscriber<T, R> parent) {
                this.parent = parent;
            }

            boolean request(int n) {
                assert (n > 0);
                assert (this.subscription != null);
                this.signalsQueued = false;
                if (!pendingDemandUpdater.compareAndSet(this, 0, n)) {
                    return false;
                }
                this.subscription.request((long)n);
                return true;
            }

            void markSignalsQueued() {
                this.signalsQueued = true;
            }

            boolean hasSignalsQueued() {
                return this.signalsQueued;
            }

            public void onSubscribe(PublisherSource.Subscription s) {
                this.subscription = ConcurrentSubscription.wrap((PublisherSource.Subscription)s);
                if (((FlatMapSubscriber)this.parent).cancellableSet.add((Cancellable)this.subscription)) {
                    ((FlatMapSubscriber)this.parent).distributeMappedDemand(this);
                }
            }

            public void onNext(@Nullable R r) {
                int pendingDemand = pendingDemandUpdater.decrementAndGet(this);
                if (pendingDemand < 0) {
                    this.handleInvalidDemand(pendingDemand, r);
                }
                try {
                    ((FlatMapSubscriber)this.parent).tryEmitItem(SubscriberApiUtils.wrapNull(r), true, this);
                }
                finally {
                    if (pendingDemand == 0) {
                        ((FlatMapSubscriber)this.parent).tryEmitItem(this, false, this);
                    }
                }
            }

            private void handleInvalidDemand(int pendingDemand, @Nullable R r) {
                pendingDemandUpdater.compareAndSet(this, pendingDemand, pendingDemand > -2 ? 0 : -2);
                throw new IllegalStateException("Too many onNext signals for Subscriber: " + this + " pendingDemand: " + pendingDemand + " discarding: " + r);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onError(Throwable t) {
                int unusedDemand = pendingDemandUpdater.getAndSet(this, -2);
                if (unusedDemand < 0) {
                    SubscriberUtils.logDuplicateTerminal((PublisherSource.Subscriber)this, (Throwable)t);
                    return;
                }
                Throwable currPendingError = ((FlatMapSubscriber)this.parent).pendingError;
                if (((FlatMapSubscriber)this.parent).source.maxDelayedErrors == 0) {
                    if (currPendingError == null && pendingErrorUpdater.compareAndSet(this.parent, null, t)) {
                        try {
                            ((FlatMapSubscriber)this.parent).doCancel(true);
                        }
                        finally {
                            ((FlatMapSubscriber)this.parent).tryEmitItem(TerminalNotification.error((Throwable)t), false, this);
                        }
                    }
                } else {
                    if (currPendingError == null) {
                        if (pendingErrorUpdater.compareAndSet(this.parent, null, t)) {
                            currPendingError = t;
                        } else {
                            currPendingError = ((FlatMapSubscriber)this.parent).pendingError;
                            assert (currPendingError != null);
                            CompositeExceptionUtils.addPendingError(pendingErrorCountUpdater, this.parent, ((FlatMapSubscriber)this.parent).source.maxDelayedErrors, currPendingError, t);
                        }
                    } else {
                        CompositeExceptionUtils.addPendingError(pendingErrorCountUpdater, this.parent, ((FlatMapSubscriber)this.parent).source.maxDelayedErrors, currPendingError, t);
                    }
                    if (((FlatMapSubscriber)this.parent).removeSubscriber(this, unusedDemand)) {
                        ((FlatMapSubscriber)this.parent).enqueueAndDrain(TerminalNotification.error((Throwable)currPendingError));
                    } else {
                        ((FlatMapSubscriber)this.parent).tryEmitItem(MAPPED_SOURCE_COMPLETE, false, this);
                    }
                }
            }

            public void onComplete() {
                int unusedDemand = pendingDemandUpdater.getAndSet(this, -2);
                if (unusedDemand < 0) {
                    SubscriberUtils.logDuplicateTerminal((PublisherSource.Subscriber)this);
                } else if (((FlatMapSubscriber)this.parent).removeSubscriber(this, unusedDemand)) {
                    ((FlatMapSubscriber)this.parent).enqueueAndDrain(TerminalNotification.complete());
                } else {
                    ((FlatMapSubscriber)this.parent).tryEmitItem(MAPPED_SOURCE_COMPLETE, false, this);
                }
            }
        }
    }
}

