/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.netty.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.AbstractNettyHttpRequest;
import io.micronaut.http.netty.reactive.HandlerPublisher;
import io.micronaut.http.netty.reactive.HandlerSubscriber;
import io.micronaut.http.netty.stream.Http2Content;
import io.micronaut.http.netty.stream.StreamedHttpMessage;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import java.util.LinkedList;
import java.util.Queue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessage>
extends ChannelDuplexHandler {
    public static final String HANDLER_BODY_PUBLISHER = "http-streams-codec-body-publisher";
    private static final Logger LOG = LoggerFactory.getLogger(HttpStreamsHandler.class);
    private final Queue<Outgoing<Out>> outgoing = new LinkedList<Outgoing<Out>>();
    private final Class<In> inClass;
    private final Class<Out> outClass;
    private In currentlyStreamedMessage;
    private boolean ignoreBodyRead;
    private boolean sendLastHttpContent;

    HttpStreamsHandler(Class<In> inClass, Class<Out> outClass) {
        this.inClass = inClass;
        this.outClass = outClass;
    }

    protected abstract boolean hasBody(In var1);

    protected abstract In createEmptyMessage(In var1);

    protected abstract In createStreamedMessage(In var1, Publisher<? extends HttpContent> var2);

    protected void receivedInMessage(ChannelHandlerContext ctx) {
    }

    protected void consumedInMessage(ChannelHandlerContext ctx) {
    }

    protected void receivedOutMessage(ChannelHandlerContext ctx) {
    }

    protected void sentOutMessage(ChannelHandlerContext ctx) {
    }

    protected void subscribeSubscriberToStream(StreamedHttpMessage msg, Subscriber<HttpContent> subscriber) {
        msg.subscribe(subscriber);
    }

    protected void bodyRequested(ChannelHandlerContext ctx) {
    }

    protected abstract boolean isClient();

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.isValidInMessage(msg)) {
            this.receivedInMessage(ctx);
            final HttpMessage inMsg = (HttpMessage)this.inClass.cast(msg);
            if (inMsg instanceof FullHttpMessage) {
                ctx.fireChannelRead(inMsg);
                this.consumedInMessage(ctx);
            } else if (!this.hasBody(inMsg)) {
                ctx.fireChannelRead(this.createEmptyMessage(inMsg));
                this.consumedInMessage(ctx);
                this.ignoreBodyRead = true;
            } else {
                this.currentlyStreamedMessage = inMsg;
                final int streamId = this.getStreamId(msg);
                HandlerPublisher<HttpContent> publisher = streamId > -1 ? new HandlerPublisher<Http2Content>(ctx.executor(), Http2Content.class){

                    @Override
                    protected boolean acceptInboundMessage(Object msg) {
                        return super.acceptInboundMessage(msg) && ((Http2Content)msg).stream().id() == streamId;
                    }

                    @Override
                    protected void cancelled() {
                        if (ctx.executor().inEventLoop()) {
                            HttpStreamsHandler.this.handleCancelled(ctx, inMsg);
                        } else {
                            ctx.executor().execute(() -> HttpStreamsHandler.this.handleCancelled(ctx, inMsg));
                        }
                    }

                    @Override
                    protected void requestDemand() {
                        HttpStreamsHandler.this.bodyRequested(ctx);
                        super.requestDemand();
                    }
                } : new HandlerPublisher<HttpContent>(ctx.executor(), HttpContent.class){

                    @Override
                    protected void cancelled() {
                        if (ctx.executor().inEventLoop()) {
                            HttpStreamsHandler.this.handleCancelled(ctx, inMsg);
                        } else {
                            ctx.executor().execute(() -> HttpStreamsHandler.this.handleCancelled(ctx, inMsg));
                        }
                    }

                    @Override
                    protected void requestDemand() {
                        HttpStreamsHandler.this.bodyRequested(ctx);
                        super.requestDemand();
                    }
                };
                ctx.channel().pipeline().addAfter(ctx.name(), HANDLER_BODY_PUBLISHER, publisher);
                ctx.fireChannelRead(this.createStreamedMessage(inMsg, (Publisher<HttpContent>)publisher));
            }
        } else if (msg instanceof HttpContent) {
            this.handleReadHttpContent(ctx, (HttpContent)msg);
        }
    }

    protected int getStreamId(Object msg) {
        if (msg instanceof HttpMessage) {
            return ((HttpMessage)msg).headers().getInt(AbstractNettyHttpRequest.STREAM_ID, -1);
        }
        return -1;
    }

    private void handleCancelled(ChannelHandlerContext ctx, In msg) {
        if (this.currentlyStreamedMessage == msg) {
            this.ignoreBodyRead = true;
            if (LOG.isTraceEnabled()) {
                LOG.trace("Calling ctx.read() for cancelled subscription");
            }
            if (this.isClient()) {
                ctx.read();
            } else {
                ctx.fireChannelWritabilityChanged();
            }
        }
    }

    private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
        if (!this.ignoreBodyRead) {
            ChannelHandler bodyPublisher = ctx.pipeline().get(HANDLER_BODY_PUBLISHER);
            if (bodyPublisher != null) {
                ctx.fireChannelRead(content);
                if (content instanceof LastHttpContent) {
                    this.removeHandlerIfActive(ctx, HANDLER_BODY_PUBLISHER);
                    this.currentlyStreamedMessage = null;
                    this.consumedInMessage(ctx);
                }
            } else {
                ReferenceCountUtil.release(content, content.refCnt());
            }
        } else {
            ReferenceCountUtil.release(content, content.refCnt());
            if (content instanceof LastHttpContent) {
                this.ignoreBodyRead = false;
                if (this.currentlyStreamedMessage != null) {
                    this.removeHandlerIfActive(ctx, HANDLER_BODY_PUBLISHER);
                }
                this.currentlyStreamedMessage = null;
            }
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.ignoreBodyRead) {
            ctx.read();
            this.ignoreBodyRead = false;
        } else {
            ctx.fireChannelReadComplete();
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.isValidOutMessage(msg)) {
            this.receivedOutMessage(ctx);
            if (ctx.channel().isWritable()) {
                this.unbufferedWrite(ctx, (HttpMessage)msg, promise);
            } else {
                this.outgoing.add(new Outgoing<HttpMessage>((HttpMessage)msg, promise));
            }
        } else if (msg instanceof LastHttpContent) {
            this.sendLastHttpContent = false;
            ctx.write(msg, promise);
        } else {
            ctx.write(msg, promise);
        }
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        while (ctx.channel().isWritable() && !this.outgoing.isEmpty()) {
            Outgoing<Out> out = this.outgoing.remove();
            this.unbufferedWrite(ctx, out.message, out.promise);
        }
    }

    protected void unbufferedWrite(final ChannelHandlerContext ctx, Out message, final ChannelPromise promise) {
        if (message instanceof FullHttpMessage) {
            ctx.writeAndFlush(message, promise);
            this.sentOutMessage(ctx);
        } else if (message instanceof StreamedHttpMessage) {
            StreamedHttpMessage streamed = (StreamedHttpMessage)message;
            HandlerSubscriber<HttpContent> subscriber = new HandlerSubscriber<HttpContent>(ctx.executor()){

                @Override
                protected void error(Throwable error) {
                    try {
                        if (LOG.isErrorEnabled()) {
                            LOG.error("Error occurred writing stream response: " + error.getMessage(), error);
                        }
                        ctx.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener(ChannelFutureListener.CLOSE);
                    }
                    finally {
                        ctx.read();
                    }
                }

                @Override
                protected void complete() {
                    if (ctx.executor().inEventLoop()) {
                        HttpStreamsHandler.this.completeBody(ctx, promise);
                    } else {
                        ctx.executor().execute(() -> HttpStreamsHandler.this.completeBody(ctx, promise));
                    }
                }
            };
            this.sendLastHttpContent = true;
            ctx.writeAndFlush(message);
            ctx.pipeline().addAfter(ctx.name(), ctx.name() + "-body-subscriber", subscriber);
            this.subscribeSubscriberToStream(streamed, (Subscriber<HttpContent>)subscriber);
        }
    }

    private void completeBody(ChannelHandlerContext ctx, ChannelPromise promise) {
        this.removeHandlerIfActive(ctx, ctx.name() + "-body-subscriber");
        if (this.sendLastHttpContent) {
            ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise);
        } else {
            promise.setSuccess();
        }
        this.sentOutMessage(ctx);
        ctx.read();
    }

    private void removeHandlerIfActive(ChannelHandlerContext ctx, String name) {
        ChannelPipeline pipeline;
        ChannelHandler handler;
        if (ctx.channel().isActive() && (handler = (pipeline = ctx.pipeline()).get(name)) != null) {
            pipeline.remove(name);
        }
    }

    protected boolean isValidOutMessage(Object msg) {
        return this.outClass.isInstance(msg);
    }

    protected boolean isValidInMessage(Object msg) {
        return this.inClass.isInstance(msg);
    }

    static class Outgoing<O extends HttpMessage> {
        final O message;
        final ChannelPromise promise;

        Outgoing(O message, ChannelPromise promise) {
            this.message = message;
            this.promise = promise;
        }
    }
}

