/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Receiver;
import reactor.core.Trackable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.http.NettyHttpChannel;
import reactor.ipc.netty.http.NettyWebSocketServerHandler;

class NettyHttpServerHandler
extends NettyChannelHandler<NettyHttpChannel> {
    NettyHttpChannel request;

    NettyHttpServerHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<NettyHttpChannel> channelBridge, Channel ch) {
        super(handler, channelBridge, ch);
    }

    NettyHttpServerHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<NettyHttpChannel> channelBridge, Channel ch, NettyHttpServerHandler parent) {
        super(handler, channelBridge, ch, parent);
        this.request = parent.request;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        ctx.read();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> messageClass = msg.getClass();
        if (this.request == null && HttpRequest.class.isAssignableFrom(messageClass)) {
            this.request = (NettyHttpChannel)this.bridgeFactory.createChannelBridge(ctx.channel(), (Flux<Object>)this.input, msg);
            if (this.request.isWebsocket()) {
                HttpObjectAggregator agg = new HttpObjectAggregator(65536);
                ctx.pipeline().addBefore("reactiveBridge", "reactorHttpAggregator", (ChannelHandler)agg);
            }
            Publisher closePublisher = (Publisher)this.handler.apply(this.request);
            HttpServerCloseSubscriber closeSub = new HttpServerCloseSubscriber(this.request, ctx);
            closePublisher.subscribe((Subscriber)closeSub);
        }
        if (HttpContent.class.isAssignableFrom(messageClass)) {
            this.doRead(msg);
            if (LastHttpContent.class.isAssignableFrom(msg.getClass())) {
                this.downstream().complete();
            }
        }
    }

    @Override
    protected ChannelFuture doOnWrite(Object data, ChannelHandlerContext ctx) {
        return ctx.write(data);
    }

    @Override
    protected void doOnTerminate(ChannelHandlerContext ctx, ChannelFuture last, ChannelPromise promise, Throwable exception) {
        super.doOnTerminate(ctx, ctx.channel().write((Object)Unpooled.EMPTY_BUFFER), promise, exception);
    }

    final NettyWebSocketServerHandler withWebsocketSupport(String url, String protocols, boolean textPlain) {
        if (!this.request.markHeadersAsFlushed()) {
            log.error("Cannot enable websocket if headers have already been sent");
            return null;
        }
        return new NettyWebSocketServerHandler(url, protocols, this, textPlain);
    }

    static final class HttpServerCloseSubscriber
    implements Subscriber<Void>,
    Receiver,
    Trackable {
        final NettyHttpChannel request;
        final ChannelHandlerContext ctx;
        Subscription subscription;

        public HttpServerCloseSubscriber(NettyHttpChannel request, ChannelHandlerContext ctx) {
            this.ctx = ctx;
            this.request = request;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.subscription, (Subscription)s)) {
                this.subscription = s;
                s.request(Long.MAX_VALUE);
            }
        }

        public void onError(Throwable t) {
            if (t != null && t instanceof IOException && t.getMessage() != null && t.getMessage().contains("Broken pipe")) {
                if (log.isDebugEnabled()) {
                    log.debug("Connection closed remotely", t);
                }
                return;
            }
            log.error("Error processing connection. Closing the channel.", t);
            if (this.request.markHeadersAsFlushed()) {
                this.request.delegate().writeAndFlush((Object)new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
            }
        }

        public void onNext(Void aVoid) {
        }

        public boolean isStarted() {
            return this.ctx.channel().isActive();
        }

        public boolean isTerminated() {
            return !this.ctx.channel().isOpen();
        }

        public Object upstream() {
            return this.subscription;
        }

        public void onComplete() {
            if (this.ctx.channel().isOpen()) {
                ChannelFuture f;
                if (log.isDebugEnabled()) {
                    log.debug("Last Http Response packet");
                }
                if (!this.request.isWebsocket()) {
                    if (this.request.markHeadersAsFlushed()) {
                        this.ctx.write((Object)this.request.getNettyResponse());
                    }
                    f = this.ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
                } else {
                    f = this.ctx.channel().writeAndFlush((Object)new CloseWebSocketFrame());
                }
                if (!this.request.isKeepAlive()) {
                    f.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                }
            }
        }
    }
}

