/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api.internal;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.oio.api.PayloadWriter;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConnectablePayloadWriter<T>
implements PayloadWriter<T> {
    private static final long REQUESTN_ABOUT_TO_PARK = Long.MIN_VALUE;
    private static final long REQUESTN_TERMINATED = -9223372036854775807L;
    private static final AtomicLongFieldUpdater<ConnectablePayloadWriter> requestedUpdater = AtomicLongFieldUpdater.newUpdater(ConnectablePayloadWriter.class, "requested");
    private static final AtomicReferenceFieldUpdater<ConnectablePayloadWriter, TerminalNotification> closedUpdater = AtomicReferenceFieldUpdater.newUpdater(ConnectablePayloadWriter.class, TerminalNotification.class, "closed");
    private static final AtomicReferenceFieldUpdater<ConnectablePayloadWriter, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(ConnectablePayloadWriter.class, Object.class, "state");
    private volatile Object state = State.DISCONNECTED;
    private volatile long requested;
    @Nullable
    private volatile TerminalNotification closed;
    @Nullable
    private Thread writerThread;

    @Override
    public void write(T t) throws IOException {
        while (true) {
            long requested;
            if ((requested = this.requested) > 0L) {
                if (!requestedUpdater.compareAndSet(this, requested, requested - 1L)) continue;
                break;
            }
            if (requested >= 0L) {
                this.waitForRequestNDemand();
                break;
            }
            this.processClosed();
        }
        PublisherSource.Subscriber<T> s = this.waitForSubscriber();
        try {
            s.onNext(t);
        }
        catch (Throwable cause) {
            this.closed = TerminalNotification.error(cause);
            this.requested = -9223372036854775807L;
            this.state = State.TERMINATED;
            s.onError(cause);
            throw cause;
        }
    }

    @Override
    public void flush() throws IOException {
        this.verifyOpen();
    }

    @Override
    public void close() throws IOException {
        this.close0(null);
    }

    @Override
    public void close(Throwable cause) throws IOException {
        this.close0(Objects.requireNonNull(cause));
    }

    private void close0(@Nullable Throwable cause) {
        block7: {
            TerminalNotification terminal;
            TerminalNotification terminalNotification = terminal = cause == null ? TerminalNotification.complete() : TerminalNotification.error(cause);
            if (closedUpdater.compareAndSet(this, null, terminal)) {
                Object currState;
                this.requested = -9223372036854775807L;
                while (true) {
                    if ((currState = this.state) instanceof PublisherSource.Subscriber) {
                        if (!stateUpdater.compareAndSet(this, currState, (Object)State.TERMINATED)) continue;
                        terminal.terminate((PublisherSource.Subscriber)currState);
                        break block7;
                    }
                    if (currState == State.TERMINATED || stateUpdater.compareAndSet(this, currState, (Object)State.TERMINATING)) break;
                }
                assert (currState != State.WAITING_FOR_CONNECTED);
            } else {
                Object currState = stateUpdater.getAndSet(this, (Object)State.TERMINATED);
                if (currState instanceof PublisherSource.Subscriber) {
                    TerminalNotification currClosed = this.closed;
                    assert (currClosed != null);
                    currClosed.terminate((PublisherSource.Subscriber)currState);
                }
            }
        }
    }

    public Publisher<T> connect() {
        return stateUpdater.compareAndSet(this, (Object)State.DISCONNECTED, (Object)State.CONNECTING) ? new ConnectedPublisher(this) : Publisher.failed(new IllegalStateException("Stream state " + this.state + " is not valid for connect."));
    }

    private void verifyOpen() throws IOException {
        TerminalNotification currClosed = this.closed;
        if (currClosed != null) {
            this.processClosed(currClosed);
        }
    }

    private void processClosed() throws IOException {
        TerminalNotification currClosed = this.closed;
        assert (currClosed != null);
        this.processClosed(currClosed);
    }

    private void processClosed(TerminalNotification currClosed) throws IOException {
        Object currState = stateUpdater.getAndSet(this, (Object)State.TERMINATED);
        if (currState instanceof PublisherSource.Subscriber && currClosed.cause() != ConnectedPublisher.CONNECTED_PUBLISHER_CANCELLED) {
            currClosed.terminate((PublisherSource.Subscriber)currState);
        }
        throw ConnectablePayloadWriter.newAlreadyClosed(currClosed.cause());
    }

    private static IOException newAlreadyClosed(@Nullable Throwable cause) {
        return new IOException("Already closed", cause);
    }

    private void waitForRequestNDemand() throws IOException {
        block5: {
            this.writerThread = Thread.currentThread();
            long oldRequested = requestedUpdater.getAndSet(this, Long.MIN_VALUE);
            if (oldRequested == 0L) {
                while (true) {
                    LockSupport.park();
                    long requested = this.requested;
                    if (requested > 0L) {
                        if (!requestedUpdater.compareAndSet(this, requested, requested - 1L)) continue;
                        this.writerThread = null;
                        break block5;
                    }
                    if (requested == Long.MIN_VALUE) continue;
                    this.writerThread = null;
                    this.processClosed();
                }
            }
            if (oldRequested > 0L) {
                this.writerThread = null;
                this.waitForRequestNDemandAvoidPark(oldRequested);
            } else {
                this.writerThread = null;
                this.processClosed();
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void waitForRequestNDemandAvoidPark(long oldRequested) throws IOException {
        while (true) {
            long requested;
            if ((requested = this.requested) == Long.MIN_VALUE) {
                if (!requestedUpdater.compareAndSet(this, Long.MIN_VALUE, oldRequested - 1L)) continue;
                return;
            }
            if (requested < 0L) {
                this.processClosed();
                continue;
            }
            if (requestedUpdater.compareAndSet(this, requested, FlowControlUtils.addWithOverflowProtection(oldRequested - 1L, requested))) return;
        }
    }

    private PublisherSource.Subscriber<? super T> waitForSubscriber() throws IOException {
        Object currState = this.state;
        return currState instanceof PublisherSource.Subscriber ? (PublisherSource.Subscriber<T>)currState : this.waitForSubscriberSlowPath();
    }

    private PublisherSource.Subscriber<? super T> waitForSubscriberSlowPath() throws IOException {
        this.writerThread = Thread.currentThread();
        while (true) {
            Object currState;
            if ((currState = this.state) instanceof PublisherSource.Subscriber) {
                this.writerThread = null;
                return (PublisherSource.Subscriber)currState;
            }
            if (currState == State.TERMINATED || currState == State.TERMINATING) {
                this.writerThread = null;
                TerminalNotification localClosed = this.closed;
                assert (localClosed != null);
                throw ConnectablePayloadWriter.newAlreadyClosed(localClosed.cause());
            }
            if (!stateUpdater.compareAndSet(this, currState, (Object)State.WAITING_FOR_CONNECTED)) continue;
            LockSupport.park();
        }
    }

    private static enum State {
        DISCONNECTED,
        CONNECTING,
        WAITING_FOR_CONNECTED,
        CONNECTED,
        TERMINATING,
        TERMINATED;

    }

    private static final class ConnectedPublisher<T>
    extends Publisher<T> {
        private static final Logger LOGGER = LoggerFactory.getLogger(ConnectedPublisher.class);
        private static final IOException CONNECTED_PUBLISHER_CANCELLED = ThrowableUtils.unknownStackTrace(new IOException("Connected Publisher cancel()"), ConnectablePayloadWriter.class, "cancel()");
        private final ConnectablePayloadWriter<T> outer;

        ConnectedPublisher(ConnectablePayloadWriter<T> outer) {
            this.outer = outer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber) {
            if (!stateUpdater.compareAndSet(this.outer, State.CONNECTING, State.CONNECTED)) {
                if (stateUpdater.compareAndSet(this.outer, State.TERMINATING, State.TERMINATED)) {
                    SubscriberUtils.deliverCompleteFromSource(subscriber);
                } else {
                    SubscriberUtils.deliverErrorFromSource(subscriber, (Throwable)new DuplicateSubscribeException(((ConnectablePayloadWriter)this.outer).state, subscriber));
                }
                return;
            }
            try {
                subscriber.onSubscribe(new PublisherSource.Subscription(){

                    /*
                     * Enabled force condition propagation
                     * Lifted jumps to return sites
                     */
                    @Override
                    public void request(long n) {
                        if (SubscriberUtils.isRequestNValid(n)) {
                            while (true) {
                                long requested;
                                if ((requested = outer.requested) >= 0L) {
                                    if (!requestedUpdater.compareAndSet(outer, requested, FlowControlUtils.addWithOverflowProtection(requested, n))) continue;
                                    return;
                                }
                                if (requested != Long.MIN_VALUE) return;
                                if (requestedUpdater.compareAndSet(outer, Long.MIN_VALUE, n)) break;
                            }
                            this.tryWakeupWriterThread();
                            return;
                        }
                        if (closedUpdater.compareAndSet(outer, null, TerminalNotification.error(SubscriberUtils.newExceptionForInvalidRequestN(n)))) {
                            this.terminateRequestN();
                            return;
                        } else {
                            LOGGER.warn("invalid request({}), but already closed.", (Object)n);
                        }
                    }

                    @Override
                    public void cancel() {
                        if (closedUpdater.compareAndSet(outer, null, TerminalNotification.error(CONNECTED_PUBLISHER_CANCELLED))) {
                            this.terminateRequestN();
                        }
                    }
                });
            }
            catch (Throwable cause) {
                SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, cause);
            }
            finally {
                block13: {
                    block14: {
                        while (true) {
                            Object currState;
                            if ((currState = ((ConnectablePayloadWriter)this.outer).state) == State.CONNECTED) {
                                if (!stateUpdater.compareAndSet(this.outer, State.CONNECTED, subscriber)) continue;
                                break block13;
                            }
                            if (currState != State.WAITING_FOR_CONNECTED) break block14;
                            if (stateUpdater.compareAndSet(this.outer, State.WAITING_FOR_CONNECTED, subscriber)) break;
                        }
                        Thread writerThread = ((ConnectablePayloadWriter)this.outer).writerThread;
                        assert (writerThread != null);
                        LockSupport.unpark(writerThread);
                        break block13;
                    }
                    TerminalNotification currClosed = ((ConnectablePayloadWriter)this.outer).closed;
                    assert (currClosed != null);
                    currClosed.terminate(subscriber);
                }
            }
        }

        private void terminateRequestN() {
            ((ConnectablePayloadWriter)this.outer).requested = -9223372036854775807L;
            this.tryWakeupWriterThread();
        }

        private void tryWakeupWriterThread() {
            Thread writerThread = ((ConnectablePayloadWriter)this.outer).writerThread;
            if (writerThread != null) {
                LockSupport.unpark(writerThread);
            }
        }
    }
}

