/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.server;

import com.linecorp.armeria.common.AggregatedHttpMessage;
import com.linecorp.armeria.common.DefaultHttpHeaders;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.HttpStatusClass;
import com.linecorp.armeria.common.logging.RequestLogAvailability;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.internal.Http1ObjectEncoder;
import com.linecorp.armeria.internal.HttpObjectEncoder;
import com.linecorp.armeria.server.DecodedHttpRequest;
import com.linecorp.armeria.server.DefaultServiceRequestContext;
import com.linecorp.armeria.server.HttpResponseException;
import com.linecorp.armeria.server.HttpServerHandler;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutChangeListener;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.logging.AccessLogWriter;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.util.ReferenceCountUtil;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class HttpResponseSubscriber
implements RequestTimeoutChangeListener,
Subscriber<HttpObject> {
    private static final Logger logger = LoggerFactory.getLogger(HttpResponseSubscriber.class);
    private static final AggregatedHttpMessage INTERNAL_SERVER_ERROR_MESSAGE = AggregatedHttpMessage.of(HttpStatus.INTERNAL_SERVER_ERROR);
    private static final AggregatedHttpMessage SERVICE_UNAVAILABLE_MESSAGE = AggregatedHttpMessage.of(HttpStatus.SERVICE_UNAVAILABLE);
    private final ChannelHandlerContext ctx;
    private final HttpObjectEncoder responseEncoder;
    private final DecodedHttpRequest req;
    private final DefaultServiceRequestContext reqCtx;
    private final AccessLogWriter accessLogWriter;
    private final long startTimeNanos;
    @Nullable
    private Subscription subscription;
    @Nullable
    private ScheduledFuture<?> timeoutFuture;
    private State state = State.NEEDS_HEADERS;
    private boolean isComplete;

    HttpResponseSubscriber(ChannelHandlerContext ctx, HttpObjectEncoder responseEncoder, DefaultServiceRequestContext reqCtx, DecodedHttpRequest req, AccessLogWriter accessLogWriter) {
        this.ctx = ctx;
        this.responseEncoder = responseEncoder;
        this.req = req;
        this.reqCtx = reqCtx;
        this.accessLogWriter = accessLogWriter;
        this.startTimeNanos = System.nanoTime();
    }

    private Service<?, ?> service() {
        return this.reqCtx.service();
    }

    private RequestLogBuilder logBuilder() {
        return this.reqCtx.logBuilder();
    }

    @Override
    public void onRequestTimeoutChange(long newRequestTimeoutMillis) {
        this.cancelTimeout();
        if (newRequestTimeoutMillis > 0L && this.state != State.DONE) {
            long passedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.startTimeNanos);
            if (passedTimeMillis < newRequestTimeoutMillis) {
                this.timeoutFuture = this.ctx.channel().eventLoop().schedule(this::onTimeout, newRequestTimeoutMillis - passedTimeMillis, TimeUnit.MILLISECONDS);
            } else {
                this.onTimeout();
            }
        }
    }

    private void onTimeout() {
        if (this.state != State.DONE) {
            this.reqCtx.setTimedOut();
            Runnable requestTimeoutHandler = this.reqCtx.requestTimeoutHandler();
            if (requestTimeoutHandler != null) {
                requestTimeoutHandler.run();
            } else {
                this.failAndRespond(RequestTimeoutException.get(), SERVICE_UNAVAILABLE_MESSAGE, Http2Error.INTERNAL_ERROR);
            }
        }
    }

    public void onSubscribe(Subscription subscription) {
        assert (this.subscription == null);
        this.subscription = subscription;
        this.onRequestTimeoutChange(this.reqCtx.requestTimeoutMillis());
        subscription.request(1L);
    }

    public void onNext(HttpObject o) {
        if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) {
            throw this.newIllegalStateException("published an HttpObject that's neither HttpHeaders nor HttpData: " + o + " (service: " + this.service() + ')');
        }
        boolean endOfStream = o.isEndOfStream();
        block0 : switch (this.state) {
            case NEEDS_HEADERS: {
                this.logBuilder().startResponse();
                if (!(o instanceof HttpHeaders)) {
                    throw this.newIllegalStateException("published an HttpData without a preceding Http2Headers: " + o + " (service: " + this.service() + ')');
                }
                HttpHeaders headers = (HttpHeaders)o;
                HttpStatus status = headers.status();
                if (status == null) {
                    throw this.newIllegalStateException("published an HttpHeaders without status: " + o + " (service: " + this.service() + ')');
                }
                if (status.codeClass() == HttpStatusClass.INFORMATIONAL) break;
                HttpHeaders additionalHeaders = this.reqCtx.additionalResponseHeaders();
                if (!additionalHeaders.isEmpty()) {
                    if (headers.isImmutable()) {
                        HttpHeaders temp = headers;
                        headers = new DefaultHttpHeaders(false, temp.size() + additionalHeaders.size());
                        headers.set(temp);
                        o = headers;
                    }
                    headers.setAllIfAbsent(additionalHeaders);
                }
                this.logBuilder().responseHeaders(headers);
                if (this.req.method() == HttpMethod.HEAD) {
                    endOfStream = true;
                    break;
                }
                int statusCode = status.code();
                switch (statusCode) {
                    case 204: 
                    case 205: 
                    case 304: {
                        endOfStream = true;
                        break block0;
                    }
                }
                this.state = State.NEEDS_DATA_OR_TRAILING_HEADERS;
                break;
            }
            case NEEDS_DATA_OR_TRAILING_HEADERS: {
                if (!(o instanceof HttpHeaders)) break;
                HttpHeaders trailingHeaders = (HttpHeaders)o;
                if (trailingHeaders.status() != null) {
                    throw this.newIllegalStateException("published a trailing HttpHeaders with status: " + o + " (service: " + this.service() + ')');
                }
                endOfStream = true;
                break;
            }
            case DONE: {
                ReferenceCountUtil.safeRelease((Object)o);
                return;
            }
        }
        this.write(o, endOfStream);
    }

    public void onError(Throwable cause) {
        if (cause instanceof HttpResponseException) {
            ((HttpResponseException)cause).httpResponse().aggregate(this.ctx.executor()).whenCompleteAsync((message, throwable) -> {
                if (throwable != null) {
                    this.failAndRespond((Throwable)throwable, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.CANCEL);
                } else {
                    this.failAndRespond(cause, (AggregatedHttpMessage)message, Http2Error.CANCEL);
                }
            }, (Executor)this.ctx.executor());
        } else if (cause instanceof HttpStatusException) {
            this.failAndRespond(cause, AggregatedHttpMessage.of(((HttpStatusException)cause).httpStatus()), Http2Error.CANCEL);
        } else if (cause instanceof AbortedStreamException) {
            this.failAndReset((AbortedStreamException)cause);
        } else {
            logger.warn("{} Unexpected exception from a service or a response publisher: {}", new Object[]{this.ctx.channel(), this.service(), cause});
            this.failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR);
        }
    }

    public void onComplete() {
        if (!this.cancelTimeout() && this.reqCtx.requestTimeoutHandler() == null) {
            return;
        }
        if (HttpResponseSubscriber.wroteNothing(this.state)) {
            logger.warn("{} Published nothing (or only informational responses): {}", (Object)this.ctx.channel(), this.service());
            this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.INTERNAL_ERROR);
            return;
        }
        if (this.state != State.DONE) {
            this.write(HttpData.EMPTY_DATA, true);
        }
    }

    private void write(HttpObject o, boolean endOfStream) {
        ChannelFuture future;
        boolean wroteEmptyData;
        if (endOfStream) {
            this.setDone();
        }
        if (o instanceof HttpData) {
            HttpData data = (HttpData)o;
            wroteEmptyData = data.isEmpty();
            future = this.responseEncoder.writeData(this.req.id(), this.req.streamId(), data, endOfStream);
            this.logBuilder().increaseResponseLength(data.length());
        } else if (o instanceof HttpHeaders) {
            wroteEmptyData = false;
            future = this.responseEncoder.writeHeaders(this.req.id(), this.req.streamId(), (HttpHeaders)o, endOfStream);
        } else {
            throw new Error();
        }
        future.addListener(f -> {
            boolean isSuccess;
            if (f.isSuccess()) {
                isSuccess = true;
            } else {
                boolean bl = isSuccess = endOfStream && wroteEmptyData && f.cause() instanceof ClosedChannelException && this.responseEncoder instanceof Http1ObjectEncoder;
            }
            if (isSuccess) {
                if (endOfStream && this.tryComplete()) {
                    this.logBuilder().endResponse();
                    this.reqCtx.log().addListener(this.accessLogWriter::log, RequestLogAvailability.COMPLETE);
                }
                if (this.state != State.DONE) {
                    this.subscription.request(1L);
                }
                return;
            }
            if (this.tryComplete()) {
                this.setDone();
                this.logBuilder().endResponse(f.cause());
                this.subscription.cancel();
                this.reqCtx.log().addListener(this.accessLogWriter::log, RequestLogAvailability.COMPLETE);
            }
            HttpServerHandler.CLOSE_ON_FAILURE.operationComplete(f);
        });
        this.ctx.flush();
    }

    private State setDone() {
        this.cancelTimeout();
        State oldState = this.state;
        this.state = State.DONE;
        return oldState;
    }

    private void failAndRespond(Throwable cause, AggregatedHttpMessage message, Http2Error error) {
        ChannelFuture future;
        HttpHeaders headers = message.headers();
        HttpData content = message.content();
        this.logBuilder().responseHeaders(headers);
        this.logBuilder().increaseResponseLength(content.length());
        State oldState = this.setDone();
        this.subscription.cancel();
        int id = this.req.id();
        int streamId = this.req.streamId();
        if (HttpResponseSubscriber.wroteNothing(oldState)) {
            if (content.isEmpty()) {
                future = this.responseEncoder.writeHeaders(id, streamId, headers, true);
            } else {
                this.responseEncoder.writeHeaders(id, streamId, headers, false);
                future = this.responseEncoder.writeData(id, streamId, content, true);
            }
        } else {
            future = this.responseEncoder.writeReset(id, streamId, error);
        }
        this.addCallbackAndFlush(cause, oldState, future);
    }

    private void failAndReset(AbortedStreamException cause) {
        State oldState = this.setDone();
        this.subscription.cancel();
        ChannelFuture future = this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.CANCEL);
        this.addCallbackAndFlush(cause, oldState, future);
    }

    private void addCallbackAndFlush(Throwable cause, State oldState, ChannelFuture future) {
        if (oldState != State.DONE) {
            future.addListener(unused -> {
                if (this.tryComplete()) {
                    this.logBuilder().endResponse(cause);
                    this.reqCtx.log().addListener(this.accessLogWriter::log, RequestLogAvailability.COMPLETE);
                }
            });
        }
        this.ctx.flush();
    }

    private boolean tryComplete() {
        if (this.isComplete) {
            return false;
        }
        this.isComplete = true;
        return true;
    }

    private static boolean wroteNothing(State state) {
        return state == State.NEEDS_HEADERS;
    }

    private boolean cancelTimeout() {
        ScheduledFuture<?> timeoutFuture = this.timeoutFuture;
        if (timeoutFuture == null) {
            return true;
        }
        this.timeoutFuture = null;
        return timeoutFuture.cancel(false);
    }

    private IllegalStateException newIllegalStateException(String msg) {
        IllegalStateException cause = new IllegalStateException(msg);
        this.failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR);
        return cause;
    }

    static enum State {
        NEEDS_HEADERS,
        NEEDS_DATA_OR_TRAILING_HEADERS,
        DONE;

    }
}

