/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.internal;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.QueueFullException;
import io.servicetalk.concurrent.internal.SignalOffloader;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.utils.internal.PlatformDependent;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class TaskBasedSignalOffloader
implements SignalOffloader {
    private static final Object NULL_WRAPPER = new Object();
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskBasedSignalOffloader.class);
    private final Executor executor;
    private final int publisherSignalQueueInitialCapacity;

    TaskBasedSignalOffloader(Executor executor) {
        this(executor, 2);
    }

    TaskBasedSignalOffloader(Executor executor, int publisherSignalQueueInitialCapacity) {
        this.executor = Objects.requireNonNull(executor);
        this.publisherSignalQueueInitialCapacity = publisherSignalQueueInitialCapacity;
    }

    @Override
    public <T> PublisherSource.Subscriber<? super T> offloadSubscriber(PublisherSource.Subscriber<? super T> subscriber) {
        return new OffloadedSubscriber<T>(subscriber, this.executor, this.publisherSignalQueueInitialCapacity);
    }

    @Override
    public <T> SingleSource.Subscriber<? super T> offloadSubscriber(SingleSource.Subscriber<? super T> subscriber) {
        return new OffloadedSingleSubscriber<T>(this.executor, subscriber);
    }

    @Override
    public CompletableSource.Subscriber offloadSubscriber(CompletableSource.Subscriber subscriber) {
        return new OffloadedCompletableSubscriber(this.executor, subscriber);
    }

    @Override
    public <T> PublisherSource.Subscriber<? super T> offloadSubscription(PublisherSource.Subscriber<? super T> subscriber) {
        return new OffloadedSubscriptionSubscriber<T>(subscriber, this.executor);
    }

    @Override
    public <T> SingleSource.Subscriber<? super T> offloadCancellable(SingleSource.Subscriber<? super T> subscriber) {
        return new OffloadedCancellableSingleSubscriber<T>(subscriber, this.executor);
    }

    @Override
    public CompletableSource.Subscriber offloadCancellable(CompletableSource.Subscriber subscriber) {
        return new OffloadedCancellableCompletableSubscriber(subscriber, this.executor);
    }

    @Override
    public <T> void offloadSubscribe(PublisherSource.Subscriber<? super T> subscriber, Consumer<PublisherSource.Subscriber<? super T>> handleSubscribe) {
        try {
            this.executor.execute(() -> handleSubscribe.accept(subscriber));
        }
        catch (Throwable throwable) {
            SubscriberUtils.deliverErrorFromSource(subscriber, throwable);
        }
    }

    @Override
    public <T> void offloadSubscribe(SingleSource.Subscriber<? super T> subscriber, Consumer<SingleSource.Subscriber<? super T>> handleSubscribe) {
        try {
            this.executor.execute(() -> handleSubscribe.accept(subscriber));
        }
        catch (Throwable throwable) {
            SubscriberUtils.deliverErrorFromSource(subscriber, throwable);
        }
    }

    @Override
    public void offloadSubscribe(CompletableSource.Subscriber subscriber, Consumer<CompletableSource.Subscriber> handleSubscribe) {
        try {
            this.executor.execute(() -> handleSubscribe.accept(subscriber));
        }
        catch (Throwable throwable) {
            SubscriberUtils.deliverErrorFromSource(subscriber, throwable);
        }
    }

    @Override
    public <T> void offloadSignal(T signal, Consumer<T> signalConsumer) {
        this.executor.execute(() -> signalConsumer.accept(signal));
    }

    private static final class OffloadedSubscriptionSubscriber<T>
    implements PublisherSource.Subscriber<T> {
        private final PublisherSource.Subscriber<T> subscriber;
        private final Executor executor;

        OffloadedSubscriptionSubscriber(PublisherSource.Subscriber<T> subscriber, Executor executor) {
            this.subscriber = Objects.requireNonNull(subscriber);
            this.executor = executor;
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscriber.onSubscribe(new OffloadedSubscription(this.executor, s));
        }

        @Override
        public void onNext(T t) {
            this.subscriber.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
        }
    }

    private static final class OffloadedCancellableCompletableSubscriber
    implements CompletableSource.Subscriber {
        private final CompletableSource.Subscriber subscriber;
        private final Executor executor;

        OffloadedCancellableCompletableSubscriber(CompletableSource.Subscriber subscriber, Executor executor) {
            this.subscriber = Objects.requireNonNull(subscriber);
            this.executor = executor;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.subscriber.onSubscribe(new OffloadedCancellable(cancellable, this.executor));
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }
    }

    private static final class OffloadedCancellableSingleSubscriber<T>
    implements SingleSource.Subscriber<T> {
        private final SingleSource.Subscriber<? super T> subscriber;
        private final Executor executor;

        OffloadedCancellableSingleSubscriber(SingleSource.Subscriber<? super T> subscriber, Executor executor) {
            this.subscriber = Objects.requireNonNull(subscriber);
            this.executor = executor;
        }

        @Override
        public void onSubscribe(Cancellable cancellable) {
            this.subscriber.onSubscribe(new OffloadedCancellable(cancellable, this.executor));
        }

        @Override
        public void onSuccess(@Nullable T result) {
            this.subscriber.onSuccess(result);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }
    }

    private static final class OffloadedCancellable
    implements Cancellable {
        private final Cancellable cancellable;
        private final Executor executor;

        OffloadedCancellable(Cancellable cancellable, Executor executor) {
            this.cancellable = Objects.requireNonNull(cancellable);
            this.executor = executor;
        }

        @Override
        public void cancel() {
            try {
                this.executor.execute(() -> SubscriberUtils.safeCancel(this.cancellable));
            }
            catch (Throwable t) {
                LOGGER.error("Failed to execute task on the executor {}. Invoking Cancellable (cancel()) in the caller thread. Cancellable {}. ", this.executor, this.cancellable, t);
                this.cancellable.cancel();
            }
        }
    }

    private static final class OffloadedCompletableSubscriber
    extends AbstractOffloadedSingleValueSubscriber
    implements CompletableSource.Subscriber {
        private static final Object COMPLETED = new Object();
        private final CompletableSource.Subscriber target;

        OffloadedCompletableSubscriber(Executor executor, CompletableSource.Subscriber target) {
            super(executor);
            this.target = Objects.requireNonNull(target);
        }

        @Override
        public void onComplete() {
            this.terminal(COMPLETED);
        }

        @Override
        public void onError(Throwable t) {
            this.terminal(t);
        }

        @Override
        void terminateOnEnqueueFailure(Throwable cause) {
            LOGGER.error("Failed to execute task on the executor {}. Invoking Subscriber (onError()) in the caller thread. Subscriber {}.", this.executor, this.target, cause);
            this.target.onError(cause);
        }

        @Override
        void deliverTerminalToSubscriber(Object terminal) {
            if (terminal instanceof Throwable) {
                SubscriberUtils.safeOnError(this.target, (Throwable)terminal);
            } else {
                SubscriberUtils.safeOnComplete(this.target);
            }
        }

        @Override
        void sendOnSubscribe(Cancellable cancellable) {
            try {
                this.target.onSubscribe(cancellable);
            }
            catch (Throwable t) {
                this.onSubscribeFailed();
                SubscriberUtils.safeOnError(this.target, t);
                SubscriberUtils.safeCancel(cancellable);
            }
        }
    }

    private static final class OffloadedSingleSubscriber<T>
    extends AbstractOffloadedSingleValueSubscriber
    implements SingleSource.Subscriber<T> {
        private final SingleSource.Subscriber<T> target;

        OffloadedSingleSubscriber(Executor executor, SingleSource.Subscriber<T> target) {
            super(executor);
            this.target = Objects.requireNonNull(target);
        }

        @Override
        public void onSuccess(@Nullable T result) {
            this.terminal(result == null ? NULL_WRAPPER : result);
        }

        @Override
        public void onError(Throwable t) {
            this.terminal(TerminalNotification.error(t));
        }

        @Override
        void terminateOnEnqueueFailure(Throwable cause) {
            LOGGER.error("Failed to execute task on the executor {}. Invoking Subscriber (onError()) in the caller thread. Subscriber {}.", this.executor, this.target, cause);
            this.target.onError(cause);
        }

        @Override
        void deliverTerminalToSubscriber(Object terminal) {
            if (terminal instanceof TerminalNotification) {
                Throwable error = ((TerminalNotification)terminal).cause();
                assert (error != null);
                SubscriberUtils.safeOnError(this.target, error);
            } else {
                SubscriberUtils.safeOnSuccess(this.target, this.uncheckCast(terminal));
            }
        }

        @Override
        void sendOnSubscribe(Cancellable cancellable) {
            try {
                this.target.onSubscribe(cancellable);
            }
            catch (Throwable t) {
                this.onSubscribeFailed();
                SubscriberUtils.safeOnError(this.target, t);
                SubscriberUtils.safeCancel(cancellable);
            }
        }

        @Nullable
        private T uncheckCast(Object signal) {
            return (T)(signal == NULL_WRAPPER ? null : signal);
        }
    }

    private static abstract class AbstractOffloadedSingleValueSubscriber
    implements Runnable {
        private static final int ON_SUBSCRIBE_RECEIVED_MASK = 8;
        private static final int EXECUTING_MASK = 16;
        private static final int RECEIVED_TERMINAL_MASK = 32;
        private static final int EXECUTING_SUBSCRIBED_RECEIVED_MASK = 24;
        private static final int STATE_INIT = 0;
        private static final int STATE_AWAITING_TERMINAL = 1;
        private static final int STATE_TERMINATED = 2;
        private static final AtomicIntegerFieldUpdater<AbstractOffloadedSingleValueSubscriber> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractOffloadedSingleValueSubscriber.class, "state");
        final Executor executor;
        @Nullable
        private Cancellable cancellable;
        @Nullable
        private Object terminal;
        private volatile int state = 0;

        AbstractOffloadedSingleValueSubscriber(Executor executor) {
            this.executor = executor;
        }

        public final void onSubscribe(Cancellable cancellable) {
            this.cancellable = cancellable;
            this.state = 8;
            try {
                this.executor.execute(this);
            }
            catch (Throwable t) {
                this.state = 2;
                this.sendOnSubscribe(Cancellable.IGNORE_CANCEL);
                this.terminateOnEnqueueFailure(t);
            }
        }

        @Override
        public final void run() {
            while (true) {
                int cState;
                if ((cState = this.state) == 2) {
                    return;
                }
                if (!this.casAppend(cState, 16)) continue;
                if (AbstractOffloadedSingleValueSubscriber.has(cState |= 0x10, 8)) {
                    while (!stateUpdater.compareAndSet(this, cState, cState & 0xFFFFFFF7)) {
                        cState = this.state;
                    }
                    assert (this.cancellable != null);
                    this.sendOnSubscribe(this.cancellable);
                    cState = this.state;
                }
                if (AbstractOffloadedSingleValueSubscriber.has(cState, 32)) {
                    if (!stateUpdater.compareAndSet(this, cState, 2)) continue;
                    assert (this.terminal != null);
                    this.deliverTerminalToSubscriber(this.terminal);
                    return;
                }
                if (stateUpdater.compareAndSet(this, cState, 1)) break;
            }
        }

        final void terminal(Object terminal) {
            int cState;
            this.terminal = terminal;
            do {
                if (!AbstractOffloadedSingleValueSubscriber.has(cState = this.state, 32) && cState != 2 && (!AbstractOffloadedSingleValueSubscriber.hasAny(cState, 24) || !this.casAppend(cState, 32))) continue;
                return;
            } while (cState != 1 && cState != 0 || !stateUpdater.compareAndSet(this, cState, 32));
            try {
                this.executor.execute(this);
            }
            catch (Throwable t) {
                this.state = 2;
                this.terminateOnEnqueueFailure(t);
            }
        }

        final void onSubscribeFailed() {
            this.state = 2;
        }

        abstract void terminateOnEnqueueFailure(Throwable var1);

        abstract void deliverTerminalToSubscriber(Object var1);

        abstract void sendOnSubscribe(Cancellable var1);

        private static boolean has(int state, int flag) {
            return (state & flag) == flag;
        }

        private static boolean hasAny(int state, int flag) {
            return (state & flag) != 0;
        }

        private boolean casAppend(int cState, int toAppend) {
            return stateUpdater.compareAndSet(this, cState, cState | toAppend);
        }
    }

    private static final class OffloadedSubscriber<T>
    implements PublisherSource.Subscriber<T>,
    Runnable {
        private static final int STATE_IDLE = 0;
        private static final int STATE_ENQUEUED = 1;
        private static final int STATE_EXECUTING = 2;
        private static final int STATE_TERMINATING = 3;
        private static final int STATE_TERMINATED = 4;
        private static final AtomicIntegerFieldUpdater<OffloadedSubscriber> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OffloadedSubscriber.class, "state");
        private volatile int state = 0;
        private final PublisherSource.Subscriber<? super T> target;
        private final Executor executor;
        private final Queue<Object> signals;
        @Nullable
        private PublisherSource.Subscription subscription;

        OffloadedSubscriber(PublisherSource.Subscriber<? super T> target, Executor executor, int publisherSignalQueueInitialCapacity) {
            this.target = target;
            this.executor = executor;
            this.signals = PlatformDependent.newUnboundedSpscQueue(publisherSignalQueueInitialCapacity);
        }

        @Override
        public void onSubscribe(PublisherSource.Subscription s) {
            this.subscription = s;
            this.offerSignal(s);
        }

        @Override
        public void onNext(T t) {
            this.offerSignal(t == null ? NULL_WRAPPER : t);
        }

        @Override
        public void onError(Throwable t) {
            this.offerSignal(TerminalNotification.error(t));
        }

        @Override
        public void onComplete() {
            this.offerSignal(TerminalNotification.complete());
        }

        @Override
        public void run() {
            this.state = 2;
            block4: while (true) {
                Object signal;
                if ((signal = this.signals.poll()) != null) {
                    if (signal instanceof PublisherSource.Subscription) {
                        PublisherSource.Subscription subscription = (PublisherSource.Subscription)signal;
                        try {
                            this.target.onSubscribe(subscription);
                        }
                        catch (Throwable t) {
                            this.clearSignalsFromExecutorThread();
                            SubscriberUtils.safeOnError(this.target, t);
                            SubscriberUtils.safeCancel(subscription);
                            return;
                        }
                    }
                    if (signal instanceof TerminalNotification) {
                        this.state = 4;
                        Throwable cause = ((TerminalNotification)signal).cause();
                        if (cause != null) {
                            SubscriberUtils.safeOnError(this.target, cause);
                        } else {
                            SubscriberUtils.safeOnComplete(this.target);
                        }
                        return;
                    }
                    Object t = signal == NULL_WRAPPER ? null : signal;
                    try {
                        this.target.onNext(t);
                    }
                    catch (Throwable th) {
                        this.clearSignalsFromExecutorThread();
                        SubscriberUtils.safeOnError(this.target, th);
                        assert (this.subscription != null);
                        SubscriberUtils.safeCancel(this.subscription);
                        return;
                    }
                }
                while (true) {
                    int cState;
                    if ((cState = this.state) == 2) {
                        if (!stateUpdater.compareAndSet(this, 2, 0)) continue;
                        return;
                    }
                    if (cState != 1) break block4;
                    if (stateUpdater.compareAndSet(this, 1, 2)) break;
                }
            }
        }

        private void clearSignalsFromExecutorThread() {
            do {
                this.state = 3;
                this.signals.clear();
            } while (!stateUpdater.compareAndSet(this, 3, 4));
        }

        private void offerSignal(Object signal) {
            int cState;
            if (!this.signals.offer(signal)) {
                throw new QueueFullException("signals");
            }
            do {
                if ((cState = this.state) == 4) {
                    this.signals.clear();
                    return;
                }
                if (cState != 3) continue;
                if (stateUpdater.getAndSet(this, 4) == 4) {
                    this.signals.clear();
                }
                return;
            } while (!stateUpdater.compareAndSet(this, cState, 1));
            if (cState != 0) {
                return;
            }
            try {
                this.executor.execute(this);
            }
            catch (Throwable t) {
                this.state = 4;
                try {
                    if (signal instanceof PublisherSource.Subscription) {
                        this.target.onSubscribe(EmptySubscriptions.EMPTY_SUBSCRIPTION);
                    }
                }
                finally {
                    SubscriberUtils.safeOnError(this.target, t);
                }
                this.signals.clear();
                assert (this.subscription != null);
                SubscriberUtils.safeCancel(this.subscription);
            }
        }
    }

    private static final class OffloadedSubscription
    implements PublisherSource.Subscription,
    Runnable {
        private static final int STATE_IDLE = 0;
        private static final int STATE_ENQUEUED = 1;
        private static final int STATE_EXECUTING = 2;
        public static final int CANCELLED = -1;
        public static final int TERMINATED = -2;
        private static final AtomicIntegerFieldUpdater<OffloadedSubscription> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OffloadedSubscription.class, "state");
        private static final AtomicLongFieldUpdater<OffloadedSubscription> requestedUpdater = AtomicLongFieldUpdater.newUpdater(OffloadedSubscription.class, "requested");
        private final Executor executor;
        private final PublisherSource.Subscription target;
        private volatile int state = 0;
        private volatile long requested;

        OffloadedSubscription(Executor executor, PublisherSource.Subscription target) {
            this.executor = executor;
            this.target = Objects.requireNonNull(target);
        }

        @Override
        public void request(long n) {
            if (!SubscriberUtils.isRequestNValid(n) && requestedUpdater.getAndSet(this, n < -2L ? n : Long.MIN_VALUE) >= 0L || requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtectionIfNotNegative) > 0L) {
                this.enqueueTaskIfRequired(true);
            }
        }

        @Override
        public void cancel() {
            long oldVal = requestedUpdater.getAndSet(this, -1L);
            if (oldVal != -1L) {
                this.enqueueTaskIfRequired(false);
            }
        }

        private void enqueueTaskIfRequired(boolean forRequestN) {
            int oldState = stateUpdater.getAndSet(this, 1);
            if (oldState == 0) {
                try {
                    this.executor.execute(this);
                }
                catch (Throwable t) {
                    if (forRequestN) {
                        LOGGER.error("Failed to execute task on the executor {}. Invoking Subscription (request()) in the caller thread. Subscription {}.", this.executor, this.target, t);
                        this.target.request(requestedUpdater.getAndSet(this, 0L));
                    }
                    this.requested = -2L;
                    LOGGER.error("Failed to execute task on the executor {}. Invoking Subscription (cancel()) in the caller thread. Subscription {}.", this.executor, this.target, t);
                    this.target.cancel();
                }
            }
        }

        @Override
        public void run() {
            this.state = 2;
            block5: while (true) {
                long r;
                if ((r = requestedUpdater.getAndSet(this, 0L)) > 0L) {
                    try {
                        this.target.request(r);
                        continue;
                    }
                    catch (Throwable t) {
                        r = -1L;
                        this.requested = -1L;
                        LOGGER.error("Unexpected exception from request(). Subscription {}.", (Object)this.target, (Object)t);
                    }
                }
                if (r == -1L) {
                    this.requested = -2L;
                    SubscriberUtils.safeCancel(this.target);
                    return;
                }
                if (r == -2L) {
                    return;
                }
                if (r != 0L) {
                    this.requested = -2L;
                    try {
                        this.target.request(r);
                    }
                    catch (IllegalArgumentException t) {
                    }
                    catch (Throwable t) {
                        LOGGER.error("Ignoring unexpected exception from request(). Subscription {}.", (Object)this.target, (Object)t);
                    }
                    return;
                }
                while (true) {
                    int cState;
                    if ((cState = this.state) == 2) {
                        if (!stateUpdater.compareAndSet(this, 2, 0)) continue;
                        return;
                    }
                    if (cState != 1) break block5;
                    if (stateUpdater.compareAndSet(this, 1, 2)) break;
                }
            }
        }
    }
}

