/*
 * Decompiled with CFR 0.152.
 */
package com.hotels.styx.server.netty.connectors;

import com.google.common.annotations.VisibleForTesting;
import com.hotels.styx.api.Buffer;
import com.hotels.styx.api.ByteStream;
import com.hotels.styx.api.ContentOverflowException;
import com.hotels.styx.api.Eventual;
import com.hotels.styx.api.HttpHandler;
import com.hotels.styx.api.HttpHeaderNames;
import com.hotels.styx.api.HttpInterceptor;
import com.hotels.styx.api.HttpResponseStatus;
import com.hotels.styx.api.HttpVersion;
import com.hotels.styx.api.Id;
import com.hotels.styx.api.LiveHttpRequest;
import com.hotels.styx.api.LiveHttpResponse;
import com.hotels.styx.api.MetricRegistry;
import com.hotels.styx.api.exceptions.NoAvailableHostsException;
import com.hotels.styx.api.exceptions.OriginUnreachableException;
import com.hotels.styx.api.exceptions.ResponseTimeoutException;
import com.hotels.styx.api.exceptions.StyxException;
import com.hotels.styx.api.exceptions.TransportLostException;
import com.hotels.styx.api.metrics.codahale.CodaHaleMetricRegistry;
import com.hotels.styx.api.plugins.spi.PluginException;
import com.hotels.styx.client.BadHttpResponseException;
import com.hotels.styx.client.StyxClientException;
import com.hotels.styx.client.connectionpool.ResourceExhaustedException;
import com.hotels.styx.common.EventProcessor;
import com.hotels.styx.common.FsmEventProcessor;
import com.hotels.styx.common.QueueDrainingEventProcessor;
import com.hotels.styx.common.StateMachine;
import com.hotels.styx.common.content.ConsumerDisconnectedException;
import com.hotels.styx.server.BadRequestException;
import com.hotels.styx.server.HttpErrorStatusListener;
import com.hotels.styx.server.HttpInterceptorContext;
import com.hotels.styx.server.NoServiceConfiguredException;
import com.hotels.styx.server.RequestProgressListener;
import com.hotels.styx.server.RequestTimeoutException;
import com.hotels.styx.server.netty.connectors.ExceptionStatusMapper;
import com.hotels.styx.server.netty.connectors.HttpResponseWriter;
import com.hotels.styx.server.netty.connectors.ResponseEnhancer;
import com.hotels.styx.server.track.RequestTracker;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import javax.net.ssl.SSLHandshakeException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

public class HttpPipelineHandler
extends SimpleChannelInboundHandler<LiveHttpRequest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpPipelineHandler.class);
    private static final ExceptionStatusMapper EXCEPTION_STATUSES = new ExceptionStatusMapper.Builder().add(HttpResponseStatus.REQUEST_TIMEOUT, RequestTimeoutException.class).add(HttpResponseStatus.BAD_GATEWAY, OriginUnreachableException.class, NoAvailableHostsException.class, NoServiceConfiguredException.class, BadHttpResponseException.class, ContentOverflowException.class).add(HttpResponseStatus.SERVICE_UNAVAILABLE, ResourceExhaustedException.class).add(HttpResponseStatus.GATEWAY_TIMEOUT, ResponseTimeoutException.class).add(HttpResponseStatus.INTERNAL_SERVER_ERROR, StyxClientException.class).build();
    private final HttpHandler httpPipeline;
    private final HttpErrorStatusListener httpErrorStatusListener;
    private final HttpResponseWriterFactory responseWriterFactory;
    private final RequestProgressListener statsSink;
    private final MetricRegistry metrics;
    private final StateMachine<State> stateMachine;
    private final ResponseEnhancer responseEnhancer;
    private final boolean secure;
    private final CharSequence originsHeaderName;
    private volatile Subscription subscription;
    private volatile LiveHttpRequest ongoingRequest;
    private volatile LiveHttpResponse ongoingResponse;
    private volatile LiveHttpRequest prematureRequest;
    private volatile CompletableFuture<Void> future;
    private volatile QueueDrainingEventProcessor eventProcessor;
    private final RequestTracker tracker;

    private HttpPipelineHandler(Builder builder, RequestTracker tracker) {
        this.responseEnhancer = Objects.requireNonNull(builder.responseEnhancer);
        this.httpPipeline = Objects.requireNonNull(builder.httpPipeline);
        this.httpErrorStatusListener = Objects.requireNonNull(builder.httpErrorStatusListener);
        this.responseWriterFactory = Objects.requireNonNull(builder.responseWriterFactory);
        this.statsSink = Objects.requireNonNull(builder.progressListener);
        this.stateMachine = this.createStateMachine();
        this.metrics = (MetricRegistry)builder.metricRegistry.get();
        this.secure = builder.secure;
        this.tracker = tracker;
        this.originsHeaderName = builder.originsHeaderName;
    }

    private StateMachine<State> createStateMachine() {
        return new StateMachine.Builder().initialState((Object)State.ACCEPTING_REQUESTS).transition((Object)State.ACCEPTING_REQUESTS, RequestReceivedEvent.class, event -> this.onLegitimateRequest(event.request, event.ctx)).transition((Object)State.ACCEPTING_REQUESTS, ChannelInactiveEvent.class, event -> State.TERMINATED).transition((Object)State.ACCEPTING_REQUESTS, ChannelExceptionEvent.class, event -> this.onChannelExceptionWhenAcceptingRequests(event.ctx, event.cause)).transition((Object)State.ACCEPTING_REQUESTS, ResponseObservableCompletedEvent.class, event -> State.ACCEPTING_REQUESTS).transition((Object)State.WAITING_FOR_RESPONSE, ResponseReceivedEvent.class, event -> this.onResponseReceived(((ResponseReceivedEvent)event).response, ((ResponseReceivedEvent)event).ctx)).transition((Object)State.WAITING_FOR_RESPONSE, RequestReceivedEvent.class, event -> this.onSpuriousRequest(event.request, State.WAITING_FOR_RESPONSE)).transition((Object)State.WAITING_FOR_RESPONSE, ChannelInactiveEvent.class, event -> this.onChannelInactive()).transition((Object)State.WAITING_FOR_RESPONSE, ChannelExceptionEvent.class, event -> this.onChannelExceptionWhenWaitingForResponse(event.ctx, event.cause)).transition((Object)State.WAITING_FOR_RESPONSE, ResponseObservableErrorEvent.class, event -> this.onResponseObservableError(((ResponseObservableErrorEvent)event).ctx, ((ResponseObservableErrorEvent)event).cause, ((ResponseObservableErrorEvent)event).requestId)).transition((Object)State.WAITING_FOR_RESPONSE, ResponseObservableCompletedEvent.class, event -> this.onResponseObservableCompletedTooSoon(((ResponseObservableCompletedEvent)event).ctx, ((ResponseObservableCompletedEvent)event).requestId)).transition((Object)State.SENDING_RESPONSE, ResponseSentEvent.class, event -> this.onResponseSent(((ResponseSentEvent)event).ctx)).transition((Object)State.SENDING_RESPONSE, ResponseWriteErrorEvent.class, event -> this.onResponseWriteError(((ResponseWriteErrorEvent)event).ctx, ((ResponseWriteErrorEvent)event).cause)).transition((Object)State.SENDING_RESPONSE, ChannelInactiveEvent.class, event -> State.SENDING_RESPONSE_CLIENT_CLOSED).transition((Object)State.SENDING_RESPONSE, ChannelExceptionEvent.class, event -> this.onChannelExceptionWhenSendingResponse(event.ctx, event.cause)).transition((Object)State.SENDING_RESPONSE, ResponseObservableErrorEvent.class, event -> this.logError(State.SENDING_RESPONSE, ((ResponseObservableErrorEvent)event).cause)).transition((Object)State.SENDING_RESPONSE, ResponseObservableCompletedEvent.class, event -> State.SENDING_RESPONSE).transition((Object)State.SENDING_RESPONSE, RequestReceivedEvent.class, event -> this.onPrematureRequest(event.request, event.ctx)).transition((Object)State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseSentEvent.class, event -> this.onResponseSentAfterClientClosed(((ResponseSentEvent)event).ctx)).transition((Object)State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseWriteErrorEvent.class, event -> this.onResponseWriteError(((ResponseWriteErrorEvent)event).ctx, ((ResponseWriteErrorEvent)event).cause)).transition((Object)State.SENDING_RESPONSE_CLIENT_CLOSED, ChannelExceptionEvent.class, event -> this.logError(State.SENDING_RESPONSE_CLIENT_CLOSED, event.cause)).transition((Object)State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableErrorEvent.class, event -> this.logError(State.SENDING_RESPONSE_CLIENT_CLOSED, ((ResponseObservableErrorEvent)event).cause)).transition((Object)State.SENDING_RESPONSE_CLIENT_CLOSED, ResponseObservableCompletedEvent.class, event -> State.SENDING_RESPONSE_CLIENT_CLOSED).transition((Object)State.TERMINATED, ChannelInactiveEvent.class, event -> State.TERMINATED).onInappropriateEvent((state, event) -> {
            LOGGER.warn(this.warningMessage(event.getClass().getSimpleName()));
            return state;
        }).build();
    }

    private State logError(State state, Throwable cause) {
        this.httpErrorStatusListener.proxyingFailure(this.ongoingRequest, this.ongoingResponse, cause);
        return state;
    }

    @VisibleForTesting
    State state() {
        return (State)((Object)this.stateMachine.currentState());
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String loggingPrefix = String.format("%s -> %s", ctx.channel().remoteAddress(), ctx.channel().localAddress());
        this.eventProcessor = new QueueDrainingEventProcessor((EventProcessor)new FsmEventProcessor(this.stateMachine, (throwable, state) -> {}, loggingPrefix));
        super.channelActive(ctx);
    }

    protected void channelRead0(ChannelHandlerContext ctx, LiveHttpRequest request) throws Exception {
        this.eventProcessor.submit((Object)new RequestReceivedEvent(request, ctx));
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.eventProcessor.submit((Object)new ChannelInactiveEvent());
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.eventProcessor.submit((Object)new ChannelExceptionEvent(ctx, cause));
    }

    private State onSpuriousRequest(LiveHttpRequest request, State state) {
        LOGGER.warn(this.warningMessage("message='Spurious request received while handling another request', spuriousRequest=" + request));
        this.metrics.counter("requests.cancelled.spuriousRequest").inc();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        this.cancelSubscription();
        return State.TERMINATED;
    }

    private State onPrematureRequest(LiveHttpRequest request, ChannelHandlerContext ctx) {
        if (this.prematureRequest != null) {
            LOGGER.warn(this.warningMessage("message='Spurious request received while handling another request', spuriousRequest=%s" + request));
            this.metrics.counter("requests.cancelled.spuriousRequest").inc();
            this.cancelSubscription();
            this.statsSink.onTerminate(this.ongoingRequest.id());
            this.tracker.endTrack(this.ongoingRequest);
            this.future.cancel(false);
            ctx.close();
            return State.TERMINATED;
        }
        this.prematureRequest = request;
        return this.state();
    }

    private State onLegitimateRequest(final LiveHttpRequest request, final ChannelHandlerContext ctx) {
        this.statsSink.onRequest(request.id());
        LiveHttpRequest v11Request = request.newBuilder().version(HttpVersion.HTTP_1_1).build();
        this.tracker.trackRequest(request, () -> this.state().toString());
        this.ongoingRequest = request;
        try {
            Eventual responseEventual = this.httpPipeline.handle(v11Request, (HttpInterceptor.Context)new HttpInterceptorContext(this.secure, HttpPipelineHandler.remoteAddress(ctx), (Executor)ctx.executor()));
            responseEventual.subscribe((Subscriber)new BaseSubscriber<LiveHttpResponse>(){

                public void hookOnSubscribe(Subscription s) {
                    HttpPipelineHandler.this.subscription = s;
                    s.request(1L);
                }

                public void hookOnComplete() {
                    HttpPipelineHandler.this.eventProcessor.submit((Object)new ResponseObservableCompletedEvent(ctx, request.id()));
                }

                public void hookOnError(Throwable cause) {
                    HttpPipelineHandler.this.eventProcessor.submit((Object)new ResponseObservableErrorEvent(ctx, cause, request.id()));
                }

                public void hookOnNext(LiveHttpResponse response) {
                    HttpPipelineHandler.this.eventProcessor.submit((Object)new ResponseReceivedEvent(response, ctx));
                }
            });
            return State.WAITING_FOR_RESPONSE;
        }
        catch (Throwable cause) {
            LiveHttpResponse response = this.exceptionToResponse(cause, request, this.originsHeaderName);
            this.httpErrorStatusListener.proxyErrorOccurred(request, HttpPipelineHandler.remoteAddress(ctx), response.status(), cause);
            this.statsSink.onTerminate(request.id());
            this.tracker.endTrack(this.ongoingRequest);
            if (ctx.channel().isActive()) {
                this.respondAndClose(ctx, response);
            }
            return State.TERMINATED;
        }
    }

    private State onResponseReceived(LiveHttpResponse response, ChannelHandlerContext ctx) {
        this.ongoingResponse = response;
        HttpResponseWriter httpResponseWriter = this.responseWriterFactory.create(ctx);
        this.future = httpResponseWriter.write(this.responseEnhancer.enhance(this.ongoingResponse, this.ongoingRequest));
        this.future.handle((ignore, cause) -> {
            if (cause != null) {
                this.eventProcessor.submit((Object)new ResponseWriteErrorEvent(ctx, (Throwable)cause));
            } else {
                this.eventProcessor.submit((Object)new ResponseSentEvent(ctx));
            }
            return null;
        });
        return State.SENDING_RESPONSE;
    }

    private State onResponseSent(ChannelHandlerContext ctx) {
        this.statsSink.onComplete(this.ongoingRequest.id(), this.ongoingResponse.status().code());
        this.tracker.endTrack(this.ongoingRequest);
        if (this.ongoingRequest.keepAlive()) {
            this.ongoingRequest = null;
            this.ongoingResponse = null;
            if (this.prematureRequest != null) {
                this.eventProcessor.submit((Object)new RequestReceivedEvent(this.prematureRequest, ctx));
                this.prematureRequest = null;
            }
            return State.ACCEPTING_REQUESTS;
        }
        this.ongoingRequest = null;
        ctx.close();
        return State.TERMINATED;
    }

    private State onResponseSentAfterClientClosed(ChannelHandlerContext ctx) {
        this.statsSink.onComplete(this.ongoingRequest.id(), this.ongoingResponse.status().code());
        this.tracker.endTrack(this.ongoingRequest);
        this.ongoingRequest = null;
        ctx.close();
        return State.TERMINATED;
    }

    private State onResponseWriteError(ChannelHandlerContext ctx, Throwable cause) {
        this.metrics.counter("requests.cancelled.responseWriteError").inc();
        this.cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        ctx.channel().writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        this.httpErrorStatusListener.proxyWriteFailure(this.ongoingRequest, this.ongoingResponse, cause);
        return State.TERMINATED;
    }

    private State onChannelInactive() {
        this.metrics.counter("requests.cancelled.channelInactive").inc();
        if (this.future != null) {
            LOGGER.warn(this.warningMessage("message=onChannelInactive"));
            this.future.cancel(false);
        }
        this.cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        return State.TERMINATED;
    }

    private State onChannelExceptionWhenSendingResponse(ChannelHandlerContext ctx, Throwable cause) {
        this.metrics.counter("requests.cancelled.channelExceptionWhileSendingResponse").inc();
        this.cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        ctx.channel().writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        this.httpErrorStatusListener.proxyErrorOccurred(cause);
        return State.TERMINATED;
    }

    private State onChannelExceptionWhenWaitingForResponse(ChannelHandlerContext ctx, Throwable cause) {
        this.metrics.counter("requests.cancelled.channelExceptionWhileWaitingForResponse").inc();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        this.cancelSubscription();
        return this.handleChannelException(ctx, cause);
    }

    private State onChannelExceptionWhenAcceptingRequests(ChannelHandlerContext ctx, Throwable cause) {
        return this.handleChannelException(ctx, cause);
    }

    private State handleChannelException(ChannelHandlerContext ctx, Throwable cause) {
        Throwable sslException = HttpPipelineHandler.sslException(cause);
        if (sslException != null) {
            if (ctx.channel().isActive()) {
                ctx.channel().close();
            }
            LOGGER.info("SSL handshake failure from incoming connection cause=\"{}\", serverAddress={}, clientAddress={}", new Object[]{sslException.getMessage(), ctx.channel().localAddress(), ctx.channel().remoteAddress()});
            return State.TERMINATED;
        }
        if (!HttpPipelineHandler.isIoException(cause)) {
            LiveHttpResponse response = this.exceptionToResponse(cause, this.ongoingRequest, this.originsHeaderName);
            this.httpErrorStatusListener.proxyErrorOccurred(response.status(), cause);
            if (ctx.channel().isActive()) {
                this.respondAndClose(ctx, response);
            }
        }
        return State.TERMINATED;
    }

    private static Throwable sslException(Throwable cause) {
        if (cause.getCause() != null && cause.getCause() instanceof SSLHandshakeException) {
            return cause.getCause();
        }
        return null;
    }

    private void respondAndClose(ChannelHandlerContext ctx, LiveHttpResponse response) {
        HttpResponseWriter responseWriter = this.responseWriterFactory.create(ctx);
        CompletableFuture<Void> future = responseWriter.write(response);
        future.handle((ignore, reason) -> {
            if (future.isCompletedExceptionally()) {
                LOGGER.error(this.warningMessage("message='Unable to send error', response=" + reason));
            }
            ctx.close();
            return null;
        });
    }

    private State onResponseObservableError(ChannelHandlerContext ctx, Throwable cause, Object requestId) {
        if (!this.ongoingRequest.id().equals(requestId)) {
            return this.state();
        }
        this.metrics.counter("requests.cancelled.responseError").inc();
        this.cancelSubscription();
        LOGGER.error(this.warningMessage(String.format("message='Error proxying request', requestId=%s cause=%s", requestId, cause)));
        if (cause instanceof ConsumerDisconnectedException) {
            return State.TERMINATED;
        }
        LiveHttpResponse response = this.exceptionToResponse(cause, this.ongoingRequest, this.originsHeaderName);
        ((CompletableFuture)this.responseWriterFactory.create(ctx).write(response).handle((ignore, exception) -> {
            if (exception != null) {
                this.httpErrorStatusListener.proxyErrorOccurred(cause);
                this.httpErrorStatusListener.proxyErrorOccurred((Throwable)exception);
            } else {
                this.httpErrorStatusListener.proxyErrorOccurred(this.ongoingRequest, HttpPipelineHandler.remoteAddress(ctx), response.status(), cause);
                this.statsSink.onComplete(this.ongoingRequest.id(), response.status().code());
                this.tracker.endTrack(this.ongoingRequest);
            }
            ctx.close();
            return null;
        })).handle((ignore, exception) -> {
            this.statsSink.onTerminate(this.ongoingRequest.id());
            this.tracker.endTrack(this.ongoingRequest);
            if (exception != null) {
                LOGGER.error(this.warningMessage("message='Error during write completion handling'"), exception);
            }
            return null;
        });
        return State.TERMINATED;
    }

    private State onResponseObservableCompletedTooSoon(ChannelHandlerContext ctx, Object requestId) {
        this.metrics.counter("requests.cancelled.observableCompletedTooSoon").inc();
        if (!this.ongoingRequest.id().equals(requestId)) {
            return this.state();
        }
        this.cancelSubscription();
        this.statsSink.onTerminate(this.ongoingRequest.id());
        this.tracker.endTrack(this.ongoingRequest);
        this.responseWriterFactory.create(ctx).write(LiveHttpResponse.response((HttpResponseStatus)HttpResponseStatus.INTERNAL_SERVER_ERROR).build()).handle((dontCare, ignore) -> ctx.close());
        return State.TERMINATED;
    }

    private static boolean isIoException(Throwable throwable) {
        return throwable instanceof IOException;
    }

    private LiveHttpResponse exceptionToResponse(Throwable cause, LiveHttpRequest request, CharSequence originsHeaderName) {
        HttpResponseStatus status = HttpPipelineHandler.status(cause instanceof PluginException ? cause.getCause() : cause);
        String message = status.code() >= 500 ? "Site temporarily unavailable." : status.description();
        LiveHttpResponse.Transformer builder = this.responseEnhancer.enhance(LiveHttpResponse.response((HttpResponseStatus)status).body(new ByteStream((Publisher)Flux.just((Object)new Buffer(message, StandardCharsets.UTF_8)))).build().newBuilder(), request).header(HttpHeaderNames.CONTENT_LENGTH, (Object)message.getBytes(StandardCharsets.UTF_8).length).header(HttpHeaderNames.CONNECTION, (Object)"close");
        if (originsHeaderName != null && this.originFromException(cause) != null) {
            return builder.header(originsHeaderName, (Object)this.originFromException(cause)).build();
        }
        return builder.build();
    }

    private String originFromException(Throwable cause) {
        if (cause instanceof StyxException) {
            StyxException c = (StyxException)cause;
            return c.origin().map(Id::toString).orElse(Optional.ofNullable(c.application()).map(Id::toString).orElse(null));
        }
        return null;
    }

    private static HttpResponseStatus status(Throwable exception) {
        return EXCEPTION_STATUSES.statusFor(exception).orElseGet(() -> {
            if (exception instanceof DecoderException) {
                Throwable cause = exception.getCause();
                if (cause instanceof BadRequestException) {
                    if (cause.getCause() instanceof TooLongFrameException) {
                        return HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE;
                    }
                    return HttpResponseStatus.BAD_REQUEST;
                }
            } else if (exception instanceof TransportLostException) {
                return HttpResponseStatus.BAD_GATEWAY;
            }
            return HttpResponseStatus.INTERNAL_SERVER_ERROR;
        });
    }

    private String warningMessage(String msg) {
        return String.format("%s, state=%s, request=%s, ongoingResponse=%s, prematureRequest=%s", new Object[]{msg, this.state(), this.ongoingRequest, this.ongoingResponse, this.prematureRequest});
    }

    private void cancelSubscription() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    private static InetSocketAddress remoteAddress(ChannelHandlerContext ctx) {
        if (ctx.channel() instanceof EmbeddedChannel) {
            return new InetSocketAddress(0);
        }
        return (InetSocketAddress)ctx.channel().remoteAddress();
    }

    public static class Builder {
        private final HttpHandler httpPipeline;
        private ResponseEnhancer responseEnhancer = ResponseEnhancer.DO_NOT_MODIFY_RESPONSE;
        private HttpErrorStatusListener httpErrorStatusListener = HttpErrorStatusListener.IGNORE_ERROR_STATUS;
        private RequestProgressListener progressListener = RequestProgressListener.IGNORE_REQUEST_PROGRESS;
        private HttpResponseWriterFactory responseWriterFactory = HttpResponseWriter::new;
        private Supplier<MetricRegistry> metricRegistry = CodaHaleMetricRegistry::new;
        private RequestTracker tracker = RequestTracker.NO_OP;
        private boolean secure;
        private CharSequence originsHeaderName;

        public Builder(HttpHandler httpPipeline) {
            this.httpPipeline = Objects.requireNonNull(httpPipeline);
        }

        public Builder responseEnhancer(ResponseEnhancer responseEnhancer) {
            this.responseEnhancer = Objects.requireNonNull(responseEnhancer);
            return this;
        }

        public Builder errorStatusListener(HttpErrorStatusListener httpErrorStatusListener) {
            this.httpErrorStatusListener = Objects.requireNonNull(httpErrorStatusListener);
            return this;
        }

        public Builder progressListener(RequestProgressListener progressListener) {
            this.progressListener = Objects.requireNonNull(progressListener);
            return this;
        }

        Builder responseWriterFactory(HttpResponseWriterFactory responseWriterFactory) {
            this.responseWriterFactory = Objects.requireNonNull(responseWriterFactory);
            return this;
        }

        public Builder metricRegistry(MetricRegistry metricRegistry) {
            Objects.requireNonNull(metricRegistry);
            this.metricRegistry = () -> metricRegistry;
            return this;
        }

        public Builder secure(boolean secure) {
            this.secure = secure;
            return this;
        }

        public Builder requestTracker(RequestTracker tracker) {
            this.tracker = Objects.requireNonNull(tracker);
            return this;
        }

        public Builder xOriginsHeader(CharSequence originsHeaderName) {
            this.originsHeaderName = originsHeaderName;
            return this;
        }

        public HttpPipelineHandler build() {
            return new HttpPipelineHandler(this, this.tracker);
        }

        HttpPipelineHandler buildForIoExceptionTest() {
            return new HttpPipelineHandler(this, this.tracker){

                @Override
                protected void channelRead0(ChannelHandlerContext ctx, LiveHttpRequest request) throws Exception {
                    throw new IOException("Connection reset by peer");
                }
            };
        }
    }

    private static class ResponseObservableCompletedEvent {
        private final ChannelHandlerContext ctx;
        private final Object requestId;

        ResponseObservableCompletedEvent(ChannelHandlerContext ctx, Object requestId) {
            this.ctx = ctx;
            this.requestId = requestId;
        }
    }

    private static class ResponseObservableErrorEvent {
        private final ChannelHandlerContext ctx;
        private final Throwable cause;
        private final Object requestId;

        ResponseObservableErrorEvent(ChannelHandlerContext ctx, Throwable cause, Object requestId) {
            this.ctx = ctx;
            this.cause = cause;
            this.requestId = requestId;
        }
    }

    private static class ChannelExceptionEvent {
        ChannelHandlerContext ctx;
        Throwable cause;

        ChannelExceptionEvent(ChannelHandlerContext ctx, Throwable cause) {
            this.ctx = ctx;
            this.cause = cause;
        }
    }

    private static class ChannelInactiveEvent {
        private ChannelInactiveEvent() {
        }
    }

    private static class ResponseWriteErrorEvent {
        private final Throwable cause;
        private final ChannelHandlerContext ctx;

        ResponseWriteErrorEvent(ChannelHandlerContext ctx, Throwable cause) {
            this.ctx = ctx;
            this.cause = cause;
        }
    }

    private static class ResponseSentEvent {
        private final ChannelHandlerContext ctx;

        ResponseSentEvent(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
    }

    private static class ResponseReceivedEvent {
        private final LiveHttpResponse response;
        private final ChannelHandlerContext ctx;

        ResponseReceivedEvent(LiveHttpResponse response, ChannelHandlerContext ctx) {
            this.response = response;
            this.ctx = ctx;
        }
    }

    private static class RequestReceivedEvent {
        final LiveHttpRequest request;
        final ChannelHandlerContext ctx;

        RequestReceivedEvent(LiveHttpRequest request, ChannelHandlerContext ctx) {
            this.request = request;
            this.ctx = ctx;
        }
    }

    @FunctionalInterface
    static interface HttpResponseWriterFactory {
        public HttpResponseWriter create(ChannelHandlerContext var1);
    }

    static enum State {
        ACCEPTING_REQUESTS,
        WAITING_FOR_RESPONSE,
        SENDING_RESPONSE,
        SENDING_RESPONSE_CLIENT_CLOSED,
        TERMINATED;

    }
}

