/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.reactive;

import io.micronaut.core.annotation.Internal;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Internal
public class HandlerSubscriber<T>
extends ChannelDuplexHandler
implements Subscriber<T> {
    protected ChannelFuture lastWriteFuture;
    private final EventExecutor executor;
    private final AtomicBoolean hasSubscription = new AtomicBoolean();
    private volatile Subscription subscription;
    private volatile ChannelHandlerContext ctx;
    private State state = State.NO_SUBSCRIPTION_OR_CONTEXT;

    public HandlerSubscriber(EventExecutor executor) {
        this.executor = executor;
    }

    protected void error(Throwable error) {
        this.doClose();
    }

    protected void complete() {
        this.doClose();
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.verifyRegisteredWithRightExecutor(ctx);
        switch (this.state.ordinal()) {
            case 0: {
                this.ctx = ctx;
                this.state = State.NO_SUBSCRIPTION;
                break;
            }
            case 2: {
                this.ctx = ctx;
                this.maybeStart();
                break;
            }
            case 6: {
                this.state = State.COMPLETE;
                ctx.close();
                break;
            }
            default: {
                throw new IllegalStateException("This handler must only be added to a pipeline once " + String.valueOf((Object)this.state));
            }
        }
    }

    public void channelRegistered(ChannelHandlerContext ctx) {
        this.verifyRegisteredWithRightExecutor(ctx);
        ctx.fireChannelRegistered();
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        this.maybeRequestMore();
        ctx.fireChannelWritabilityChanged();
    }

    public void channelActive(ChannelHandlerContext ctx) {
        if (this.state == State.INACTIVE) {
            this.state = State.RUNNING;
            this.maybeRequestMore();
        }
        ctx.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.cancel();
        ctx.fireChannelInactive();
    }

    public void handlerRemoved(ChannelHandlerContext ctx) {
        this.cancel();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.cancel();
        ctx.fireExceptionCaught(cause);
    }

    public void onSubscribe(Subscription subscription) {
        if (subscription == null) {
            throw new NullPointerException("Null subscription");
        }
        if (!this.hasSubscription.compareAndSet(false, true)) {
            subscription.cancel();
        } else {
            this.subscription = subscription;
            this.executor.execute(this::provideSubscription);
        }
    }

    public void onNext(T t) {
        this.onNext(t, this.ctx.newPromise());
    }

    protected void onNext(T t, ChannelPromise promise) {
        this.lastWriteFuture = this.ctx.writeAndFlush(t, promise);
        this.lastWriteFuture.addListener(future -> this.maybeRequestMore());
    }

    public void onError(Throwable error) {
        if (error == null) {
            throw new NullPointerException("Null error published");
        }
        this.error(error);
    }

    public void onComplete() {
        if (this.lastWriteFuture == null) {
            this.complete();
        } else {
            this.lastWriteFuture.addListener(channelFuture -> this.complete());
        }
    }

    private void doClose() {
        this.executor.execute(() -> {
            switch (this.state.ordinal()) {
                case 1: 
                case 3: 
                case 4: {
                    this.ctx.close();
                    this.state = State.COMPLETE;
                    break;
                }
            }
        });
    }

    private void maybeRequestMore() {
        if (this.ctx.channel().isWritable() && this.state != State.COMPLETE && this.state != State.CANCELLED) {
            this.subscription.request(1L);
        }
    }

    private void verifyRegisteredWithRightExecutor(ChannelHandlerContext ctx) {
        if (ctx.channel().isRegistered() && !this.executor.inEventLoop()) {
            throw new IllegalArgumentException("Channel handler MUST be registered with the same EventExecutor that it is created with.");
        }
    }

    private void cancel() {
        switch (this.state.ordinal()) {
            case 1: {
                this.state = State.CANCELLED;
                break;
            }
            case 3: 
            case 4: {
                this.subscription.cancel();
                this.state = State.CANCELLED;
                break;
            }
        }
    }

    private void provideSubscription() {
        switch (this.state.ordinal()) {
            case 0: {
                this.state = State.NO_CONTEXT;
                break;
            }
            case 1: {
                this.maybeStart();
                break;
            }
            case 5: {
                this.subscription.cancel();
                break;
            }
        }
    }

    private void maybeStart() {
        if (this.ctx.channel().isActive()) {
            this.state = State.RUNNING;
            this.maybeRequestMore();
        } else {
            this.state = State.INACTIVE;
        }
    }

    static enum State {
        NO_SUBSCRIPTION_OR_CONTEXT,
        NO_SUBSCRIPTION,
        NO_CONTEXT,
        INACTIVE,
        RUNNING,
        CANCELLED,
        COMPLETE;

    }
}

