/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.Future;
import java.io.File;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Loopback;
import reactor.core.Producer;
import reactor.core.Receiver;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.ipc.netty.common.MonoChannelFuture;
import reactor.ipc.netty.http.HttpChannel;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.HttpOutbound;
import reactor.ipc.netty.tcp.TcpChannel;

abstract class NettyHttpChannel
extends TcpChannel
implements HttpChannel,
HttpInbound,
HttpOutbound {
    static final AsciiString EVENT_STREAM = new AsciiString((CharSequence)"text/event-stream");
    final HttpRequest nettyRequest;
    final HttpHeaders headers;
    HttpResponse nettyResponse;
    HttpHeaders responseHeaders;
    volatile int statusAndHeadersSent = 0;
    Function<? super String, Map<String, Object>> paramsResolver;
    protected static final AtomicIntegerFieldUpdater<NettyHttpChannel> HEADERS_SENT = AtomicIntegerFieldUpdater.newUpdater(NettyHttpChannel.class, "statusAndHeadersSent");
    static final FullHttpResponse CONTINUE = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE, Unpooled.EMPTY_BUFFER);

    public NettyHttpChannel(Channel ioChannel, Flux<Object> input, HttpRequest request) {
        super(ioChannel, input);
        this.nettyRequest = request == null ? new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/") : request;
        this.nettyResponse = new DefaultHttpResponse(this.nettyRequest.protocolVersion(), HttpResponseStatus.OK);
        this.headers = this.nettyRequest.headers();
        this.responseHeaders = this.nettyResponse.headers();
        this.responseHeader((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (CharSequence)HttpHeaderValues.CHUNKED);
        this.responseHeader((CharSequence)HttpHeaderNames.CONNECTION, (CharSequence)HttpHeaderValues.CLOSE);
    }

    @Override
    public HttpOutbound addCookie(Cookie cookie) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.headers.add((CharSequence)HttpHeaderNames.COOKIE, (Object)ServerCookieEncoder.STRICT.encode(cookie));
        return this;
    }

    @Override
    public HttpOutbound addHeader(CharSequence name, CharSequence value) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.headers.add(name, (Object)value);
        return this;
    }

    @Override
    public HttpChannel addResponseCookie(Cookie cookie) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.responseHeaders.add((CharSequence)HttpHeaderNames.SET_COOKIE, (Object)ServerCookieEncoder.STRICT.encode(cookie));
        return this;
    }

    @Override
    public HttpChannel addResponseHeader(CharSequence name, CharSequence value) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.responseHeaders.add(name, (Object)value);
        return this;
    }

    @Override
    public String toString() {
        if (this.isWebsocket()) {
            return "ws:" + this.uri();
        }
        return this.method().name() + ":" + this.uri();
    }

    @Override
    public HttpOutbound header(CharSequence name, CharSequence value) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.headers.set(name, (Object)value);
        return this;
    }

    @Override
    public HttpHeaders headers() {
        return this.headers;
    }

    @Override
    public Flux<Object> receiveObject() {
        if (HttpUtil.is100ContinueExpected((HttpMessage)this.nettyRequest)) {
            return MonoChannelFuture.from(() -> this.delegate().writeAndFlush((Object)CONTINUE)).thenMany(super.receiveObject());
        }
        return super.receiveObject();
    }

    @Override
    public boolean isKeepAlive() {
        return HttpUtil.isKeepAlive((HttpMessage)this.nettyRequest);
    }

    @Override
    public boolean isWebsocket() {
        String isWebsocket = this.headers.get((CharSequence)HttpHeaderNames.UPGRADE);
        return isWebsocket != null && isWebsocket.toLowerCase().equals("websocket");
    }

    @Override
    public HttpOutbound keepAlive(boolean keepAlive) {
        HttpUtil.setKeepAlive((HttpMessage)this.nettyRequest, (boolean)keepAlive);
        return this;
    }

    @Override
    public HttpMethod method() {
        return this.nettyRequest.method();
    }

    @Override
    public Object param(CharSequence key) {
        Map<String, Object> params = null;
        if (this.paramsResolver != null) {
            params = this.paramsResolver.apply(this.uri());
        }
        return null != params ? params.get(key) : null;
    }

    @Override
    public Map<String, Object> params() {
        return null != this.paramsResolver ? this.paramsResolver.apply(this.uri()) : null;
    }

    @Override
    public HttpChannel paramsResolver(Function<? super String, Map<String, Object>> headerResolver) {
        this.paramsResolver = headerResolver;
        return this;
    }

    @Override
    public HttpVersion version() {
        HttpVersion version = this.nettyRequest.protocolVersion();
        if (version.equals((Object)HttpVersion.HTTP_1_0)) {
            return HttpVersion.HTTP_1_0;
        }
        if (version.equals((Object)HttpVersion.HTTP_1_1)) {
            return HttpVersion.HTTP_1_1;
        }
        throw new IllegalStateException(version.protocolName() + " not supported");
    }

    @Override
    public HttpChannel responseTransfer(boolean chunked) {
        HttpUtil.setTransferEncodingChunked((HttpMessage)this.nettyResponse, (boolean)chunked);
        return this;
    }

    @Override
    public HttpOutbound removeTransferEncodingChunked() {
        HttpUtil.setTransferEncodingChunked((HttpMessage)this.nettyRequest, (boolean)false);
        return this;
    }

    @Override
    public Map<CharSequence, Set<Cookie>> cookies() {
        return null;
    }

    @Override
    public HttpChannel responseHeader(CharSequence name, CharSequence value) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.responseHeaders.set(name, (Object)value);
        return this;
    }

    @Override
    public HttpHeaders responseHeaders() {
        return this.responseHeaders;
    }

    @Override
    public HttpResponseStatus status() {
        return HttpResponseStatus.valueOf((int)this.nettyResponse.status().code());
    }

    @Override
    public HttpChannel status(HttpResponseStatus status) {
        if (this.statusAndHeadersSent != 0) {
            throw new IllegalStateException("Status and headers already sent");
        }
        this.nettyResponse.setStatus(status);
        return this;
    }

    @Override
    public HttpChannel sse() {
        this.header((CharSequence)HttpHeaderNames.CONTENT_TYPE, (CharSequence)EVENT_STREAM);
        return this;
    }

    @Override
    public void subscribe(Subscriber<? super Void> subscriber) {
    }

    @Override
    public Mono<Void> sendFile(File file, long position, long count) {
        Supplier<Mono> writeFile = () -> MonoChannelFuture.from((Future)this.delegate().writeAndFlush((Object)new DefaultFileRegion(file, position, count)));
        return this.sendHeaders().then(writeFile);
    }

    @Override
    public String uri() {
        return this.nettyRequest.uri();
    }

    @Override
    public Mono<Void> sendObject(Publisher<?> source) {
        return new MonoOutboundWrite(source);
    }

    @Override
    public Mono<Void> sendString(Publisher<? extends String> dataStream, Charset charset) {
        if (this.isWebsocket()) {
            return new MonoOutboundWrite((Publisher<?>)Flux.from(dataStream).map(TextWebSocketFrame::new));
        }
        return this.send((Publisher<? extends ByteBuf>)Flux.from(dataStream).map(s -> this.delegate().alloc().buffer().writeBytes(s.getBytes(charset))));
    }

    @Override
    public Mono<Void> send(Publisher<? extends ByteBuf> dataStream) {
        return new MonoOutboundWrite(dataStream);
    }

    @Override
    public Mono<Void> sendHeaders() {
        if (this.statusAndHeadersSent == 0) {
            return new MonoOnlyHeaderWrite();
        }
        return Mono.empty();
    }

    @Override
    public HttpOutbound flushEach() {
        super.flushEach();
        return this;
    }

    protected abstract void doSubscribeHeaders(Subscriber<? super Void> var1);

    final boolean markHeadersAsFlushed() {
        return HEADERS_SENT.compareAndSet(this, 0, 1);
    }

    HttpRequest getNettyRequest() {
        return this.nettyRequest;
    }

    HttpResponse getNettyResponse() {
        return this.nettyResponse;
    }

    void setNettyResponse(HttpResponse nettyResponse) {
        this.nettyResponse = nettyResponse;
        this.responseHeaders = nettyResponse.headers();
    }

    final class MonoOnlyHeaderWrite
    extends Mono<Void>
    implements Loopback {
        MonoOnlyHeaderWrite() {
        }

        public Object connectedInput() {
            return NettyHttpChannel.this;
        }

        public Object connectedOutput() {
            return NettyHttpChannel.this;
        }

        public void subscribe(Subscriber<? super Void> s) {
            if (NettyHttpChannel.this.markHeadersAsFlushed()) {
                NettyHttpChannel.this.doSubscribeHeaders(s);
            } else {
                Operators.error(s, (Throwable)new IllegalStateException("Status and headers already sent"));
            }
        }
    }

    final class MonoOutboundWrite
    extends Mono<Void>
    implements Receiver,
    Loopback {
        final Publisher<?> source;

        public MonoOutboundWrite(Publisher<?> source) {
            this.source = source;
        }

        public Object connectedInput() {
            return NettyHttpChannel.this;
        }

        public Object connectedOutput() {
            return NettyHttpChannel.this;
        }

        public void subscribe(Subscriber<? super Void> s) {
            if (NettyHttpChannel.this.markHeadersAsFlushed()) {
                NettyHttpChannel.this.doSubscribeHeaders(new HttpOutboundSubscriber(s));
            } else {
                NettyHttpChannel.this.emitWriter(this.source, (Subscriber<? super Void>)s);
            }
        }

        public Object upstream() {
            return this.source;
        }

        final class HttpOutboundSubscriber
        implements Subscriber<Void>,
        Receiver,
        Producer {
            final Subscriber<? super Void> s;
            Subscription subscription;

            public HttpOutboundSubscriber(Subscriber<? super Void> s) {
                this.s = s;
            }

            public Subscriber downstream() {
                return this.s;
            }

            public void onComplete() {
                this.subscription = null;
                NettyHttpChannel.this.emitWriter(MonoOutboundWrite.this.source, (Subscriber<? super Void>)this.s);
            }

            public void onError(Throwable t) {
                this.subscription = null;
                this.s.onError(t);
            }

            public void onNext(Void aVoid) {
            }

            public void onSubscribe(Subscription sub) {
                this.subscription = sub;
                sub.request(Long.MAX_VALUE);
            }

            public Object upstream() {
                return this.subscription;
            }
        }
    }
}

