/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import java.io.IOException;
import java.net.URI;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.common.ChannelBridge;
import reactor.ipc.netty.common.NettyChannel;
import reactor.ipc.netty.common.NettyChannelHandler;
import reactor.ipc.netty.http.HttpClientChannel;
import reactor.ipc.netty.http.HttpClientResponse;
import reactor.ipc.netty.http.HttpException;
import reactor.ipc.netty.http.NettyWebSocketClientHandler;
import reactor.ipc.netty.http.RedirectException;

class NettyHttpClientHandler
extends NettyChannelHandler<HttpClientChannel> {
    HttpClientChannel httpChannel;
    DirectProcessor<Void> connectSignal;
    Subscriber<? super HttpClientResponse> replySubscriber;

    NettyHttpClientHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<HttpClientChannel> channelBridge, Channel ch) {
        super(handler, channelBridge, ch);
    }

    NettyHttpClientHandler(Function<? super NettyChannel, ? extends Publisher<Void>> handler, ChannelBridge<HttpClientChannel> channelBridge, Channel ch, NettyHttpClientHandler parent) {
        super(handler, channelBridge, ch, parent);
        this.httpChannel = parent.httpChannel;
        this.replySubscriber = parent.replySubscriber;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        if (this.httpChannel != null) {
            return;
        }
        this.httpChannel = (HttpClientChannel)this.bridgeFactory.createChannelBridge(ctx.channel(), (Flux<Object>)this.input, new Object[0]);
        this.httpChannel.keepAlive(true);
        HttpUtil.setTransferEncodingChunked((HttpMessage)this.httpChannel.nettyRequest, (boolean)true);
        ((Publisher)this.handler.apply(this.httpChannel)).subscribe((Subscriber)new HttpClientCloseSubscriber(ctx));
    }

    void bridgeReply(Subscriber<? super HttpClientResponse> replySubscriber, DirectProcessor<Void> connectSignal) {
        this.replySubscriber = replySubscriber;
        this.connectSignal = connectSignal;
    }

    @Override
    protected void doOnTerminate(ChannelHandlerContext ctx, ChannelFuture last, ChannelPromise promise, Throwable exception) {
        super.doOnTerminate(ctx, ctx.channel().write(this.httpChannel != null && this.httpChannel.isWebsocket() ? Unpooled.EMPTY_BUFFER : LastHttpContent.EMPTY_LAST_CONTENT), promise, exception);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> messageClass = msg.getClass();
        if (HttpResponse.class.isAssignableFrom(messageClass)) {
            HttpResponse response = (HttpResponse)msg;
            if (this.httpChannel != null) {
                this.httpChannel.setNettyResponse(response);
            }
            if (log.isDebugEnabled()) {
                log.debug("Received response (auto-read:{}) : {}", new Object[]{ctx.channel().config().isAutoRead(), this.httpChannel.headers().toString()});
            }
            if (this.checkResponseCode(ctx, response)) {
                ctx.fireChannelRead(msg);
                if (this.replySubscriber != null) {
                    Flux.just((Object)this.httpChannel).subscribe(this.replySubscriber);
                } else {
                    log.debug("No Response/ HttpInbound subscriber on {}, msg is dropped {}", new Object[]{ctx.channel(), msg});
                }
            }
            this.postRead(ctx, msg);
            return;
        }
        if (LastHttpContent.EMPTY_LAST_CONTENT != msg) {
            this.doRead(msg);
        }
        this.postRead(ctx, msg);
    }

    final NettyWebSocketClientHandler withWebsocketSupport(URI url, String protocols, boolean textPlain) {
        if (!this.httpChannel.markHeadersAsFlushed()) {
            log.error("Cannot enable websocket if headers have already been sent");
            return null;
        }
        return new NettyWebSocketClientHandler(url, protocols, this, textPlain);
    }

    final boolean checkResponseCode(ChannelHandlerContext ctx, HttpResponse response) throws Exception {
        int code = response.status().code();
        if (code >= 400) {
            HttpException ex = new HttpException(this.httpChannel);
            if (this.connectSignal != null) {
                this.connectSignal.onError((Throwable)ex);
            } else if (this.replySubscriber != null) {
                Operators.error(this.replySubscriber, (Throwable)ex);
            }
            return false;
        }
        if (code >= 300 && this.httpChannel.isFollowRedirect()) {
            RedirectException ex = new RedirectException(this.httpChannel);
            if (this.connectSignal != null) {
                this.connectSignal.onError((Throwable)ex);
            } else if (this.replySubscriber != null) {
                Operators.error(this.replySubscriber, (Throwable)ex);
            }
            return false;
        }
        if (this.connectSignal != null) {
            this.connectSignal.onComplete();
        }
        return true;
    }

    protected void postRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof LastHttpContent) {
            if (log.isDebugEnabled()) {
                log.debug("Read last http packet");
            }
            ctx.channel().close();
            this.downstream().complete();
        }
    }

    static class HttpClientCloseSubscriber
    implements Subscriber<Void> {
        private final ChannelHandlerContext ctx;

        public HttpClientCloseSubscriber(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        public void onSubscribe(Subscription s) {
            this.ctx.read();
            Operators.validate(null, (Subscription)s);
            s.request(Long.MAX_VALUE);
        }

        public void onError(Throwable t) {
            if (t == null) {
                throw Exceptions.argumentIsNullException();
            }
            if (t instanceof IOException && t.getMessage() != null && t.getMessage().contains("Broken pipe")) {
                if (log.isDebugEnabled()) {
                    log.debug("Connection closed remotely", t);
                }
                return;
            }
            if (this.ctx.channel().isOpen()) {
                if (log.isDebugEnabled()) {
                    log.error("Closing HTTP channel due to error", t);
                }
                this.ctx.channel().close();
            }
        }

        public void onNext(Void aVoid) {
        }

        public void onComplete() {
        }
    }
}

