/*
 * Decompiled with CFR 0.152.
 */
package karate.com.linecorp.armeria.server;

import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import karate.com.linecorp.armeria.common.AggregatedHttpResponse;
import karate.com.linecorp.armeria.common.CancellationException;
import karate.com.linecorp.armeria.common.ClosedSessionException;
import karate.com.linecorp.armeria.common.EmptyHttpResponseException;
import karate.com.linecorp.armeria.common.HttpData;
import karate.com.linecorp.armeria.common.HttpHeaderNames;
import karate.com.linecorp.armeria.common.HttpHeaders;
import karate.com.linecorp.armeria.common.HttpObject;
import karate.com.linecorp.armeria.common.ResponseHeaders;
import karate.com.linecorp.armeria.common.annotation.Nullable;
import karate.com.linecorp.armeria.common.logging.RequestLog;
import karate.com.linecorp.armeria.common.logging.RequestLogProperty;
import karate.com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import karate.com.linecorp.armeria.common.stream.ClosedStreamException;
import karate.com.linecorp.armeria.common.util.Exceptions;
import karate.com.linecorp.armeria.common.util.SafeCloseable;
import karate.com.linecorp.armeria.internal.common.Http1ObjectEncoder;
import karate.com.linecorp.armeria.internal.common.HttpHeadersUtil;
import karate.com.linecorp.armeria.internal.common.RequestContextUtil;
import karate.com.linecorp.armeria.internal.server.DefaultServiceRequestContext;
import karate.com.linecorp.armeria.internal.shaded.guava.base.MoreObjects;
import karate.com.linecorp.armeria.server.AbstractHttpResponseHandler;
import karate.com.linecorp.armeria.server.AccessLogWriterUtil;
import karate.com.linecorp.armeria.server.DecodedHttpRequest;
import karate.com.linecorp.armeria.server.HttpResponseException;
import karate.com.linecorp.armeria.server.HttpServerHandler;
import karate.com.linecorp.armeria.server.HttpStatusException;
import karate.com.linecorp.armeria.server.ServerHttpObjectEncoder;
import karate.com.linecorp.armeria.unsafe.PooledObjects;
import karate.io.netty.channel.ChannelFuture;
import karate.io.netty.channel.ChannelFutureListener;
import karate.io.netty.channel.ChannelHandlerContext;
import karate.io.netty.handler.codec.http2.Http2Error;
import karate.io.netty.util.concurrent.Future;
import karate.io.netty.util.concurrent.GenericFutureListener;
import karate.org.reactivestreams.Subscriber;
import karate.org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractHttpResponseSubscriber
extends AbstractHttpResponseHandler
implements Subscriber<HttpObject> {
    static final Logger logger = LoggerFactory.getLogger(AbstractHttpResponseSubscriber.class);
    @Nullable
    private Subscription subscription;
    private State state = State.NEEDS_HEADERS;
    private boolean isSubscriptionCompleted;
    private boolean loggedResponseHeadersFirstBytesTransferred;
    @Nullable
    private WriteHeadersFutureListener cachedWriteHeadersListener;
    @Nullable
    private WriteDataFutureListener cachedWriteDataListener;

    AbstractHttpResponseSubscriber(ChannelHandlerContext ctx, ServerHttpObjectEncoder responseEncoder, DefaultServiceRequestContext reqCtx, DecodedHttpRequest req, CompletableFuture<Void> completionFuture) {
        super(ctx, responseEncoder, reqCtx, req, completionFuture);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        assert (this.subscription == null);
        this.subscription = subscription;
        if (this.state == State.DONE) {
            if (!this.isSubscriptionCompleted) {
                this.isSubscriptionCompleted = true;
                subscription.cancel();
            }
            return;
        }
        this.scheduleTimeout();
        subscription.request(1L);
    }

    @Override
    public void onNext(HttpObject o) {
        if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) {
            this.req.abortResponse(new IllegalArgumentException("published an HttpObject that's neither HttpHeaders nor HttpData: " + o + " (service: " + this.service() + ')'), true);
            PooledObjects.close(o);
            return;
        }
        if (this.failIfStreamOrSessionClosed()) {
            PooledObjects.close(o);
            this.setDone(true);
            return;
        }
        switch (this.state.ordinal()) {
            case 0: {
                this.logBuilder().startResponse();
                if (!(o instanceof ResponseHeaders)) {
                    this.req.abortResponse(new IllegalStateException("published an HttpData without a preceding ResponseHeaders: " + o + " (service: " + this.service() + ')'), true);
                    return;
                }
                if (this.responseEncoder.isResponseHeadersSent(this.req.id(), this.req.streamId())) {
                    this.tryComplete(new CancelledSubscriptionException("An HTTP response was sent already. ctx: " + this.reqCtx));
                    this.setDone(true);
                    return;
                }
                this.onResponseHeaders((ResponseHeaders)o);
                break;
            }
            case 3: {
                if (o instanceof ResponseHeaders) {
                    this.req.abortResponse(new IllegalStateException("published a ResponseHeaders: " + o + " (expected: an HTTP trailers). service: " + this.service()), true);
                    return;
                }
                if (o instanceof HttpData) {
                    ((HttpData)o).close();
                    assert (this.subscription != null);
                    this.subscription.request(1L);
                    return;
                }
            }
            case 2: {
                if (o instanceof HttpHeaders) {
                    HttpHeaders trailers = (HttpHeaders)o;
                    if (trailers.contains(HttpHeaderNames.STATUS)) {
                        this.req.abortResponse(new IllegalArgumentException("published an HTTP trailers with status: " + o + " (service: " + this.service() + ')'), true);
                        return;
                    }
                    this.setDone(false);
                    HttpHeaders merged = HttpHeadersUtil.mergeTrailers(trailers, this.reqCtx.additionalResponseTrailers());
                    this.logBuilder().responseTrailers(merged);
                    this.responseEncoder.writeTrailers(this.req.id(), this.req.streamId(), merged).addListener(this.writeHeadersFutureListener(true));
                    break;
                }
                boolean endOfStream = o.isEndOfStream();
                HttpData data = (HttpData)o;
                data.touch(this.reqCtx);
                boolean wroteEmptyData = data.isEmpty();
                this.logBuilder().increaseResponseLength(data);
                if (endOfStream) {
                    this.setDone(false);
                }
                HttpHeaders additionalTrailers = this.reqCtx.additionalResponseTrailers();
                if (endOfStream && !additionalTrailers.isEmpty()) {
                    this.responseEncoder.writeData(this.req.id(), this.req.streamId(), data, false).addListener(this.writeDataFutureListener(false, wroteEmptyData));
                    this.logBuilder().responseTrailers(additionalTrailers);
                    this.responseEncoder.writeTrailers(this.req.id(), this.req.streamId(), additionalTrailers).addListener(this.writeHeadersFutureListener(true));
                    break;
                }
                this.responseEncoder.writeData(this.req.id(), this.req.streamId(), data, endOfStream).addListener(this.writeDataFutureListener(endOfStream, wroteEmptyData));
                break;
            }
            case 1: {
                if (!(o instanceof HttpData)) {
                    this.req.abortResponse(new IllegalStateException(o + " is published. (expected: an HttpData) (service: " + this.service() + ')'), true);
                    return;
                }
                boolean endOfStream = o.isEndOfStream();
                HttpData data = (HttpData)o;
                data.touch(this.reqCtx);
                boolean wroteEmptyData = data.isEmpty();
                this.logBuilder().increaseResponseLength(data);
                if (endOfStream) {
                    this.setDone(false);
                }
                this.responseEncoder.writeData(this.req.id(), this.req.streamId(), data, endOfStream).addListener(this.writeDataFutureListener(endOfStream, wroteEmptyData));
                break;
            }
            case 4: {
                assert (this.subscription != null);
                this.isSubscriptionCompleted = true;
                this.subscription.cancel();
                PooledObjects.close(o);
                return;
            }
        }
        this.ctx.flush();
    }

    abstract void onResponseHeaders(ResponseHeaders var1);

    void setState(State state) {
        this.state = state;
    }

    @Override
    boolean isDone() {
        return this.state == State.DONE;
    }

    State setDone(boolean cancel) {
        if (cancel && this.subscription != null && !this.isSubscriptionCompleted) {
            this.isSubscriptionCompleted = true;
            this.subscription.cancel();
        }
        this.clearTimeout();
        State oldState = this.state;
        this.state = State.DONE;
        return oldState;
    }

    @Override
    public void onError(Throwable cause) {
        this.isSubscriptionCompleted = true;
        Throwable peeled = Exceptions.peel(cause);
        if (!this.isWritable()) {
            this.fail(peeled);
            return;
        }
        if (peeled instanceof HttpResponseException) {
            this.toAggregatedHttpResponse((HttpResponseException)peeled).handleAsync((res, throwable) -> {
                if (throwable != null) {
                    this.failAndRespond((Throwable)throwable, internalServerErrorResponse, Http2Error.CANCEL, false);
                } else {
                    this.failAndRespond(peeled, (AggregatedHttpResponse)res, Http2Error.CANCEL, false);
                }
                return null;
            }, (Executor)this.ctx.executor());
        } else if (peeled instanceof HttpStatusException) {
            Throwable cause0 = MoreObjects.firstNonNull(peeled.getCause(), peeled);
            AggregatedHttpResponse res2 = this.toAggregatedHttpResponse((HttpStatusException)peeled);
            this.failAndRespond(cause0, res2, Http2Error.CANCEL, false);
        } else if (Exceptions.isStreamCancelling(peeled)) {
            this.failAndReset(peeled);
        } else {
            if (!(peeled instanceof CancellationException)) {
                logger.warn("{} Unexpected exception from a service or a response publisher: {}", new Object[]{this.ctx.channel(), this.service(), peeled});
            }
            this.failAndRespond(peeled, internalServerErrorResponse, Http2Error.INTERNAL_ERROR, false);
        }
    }

    @Override
    public void onComplete() {
        this.isSubscriptionCompleted = true;
        State oldState = this.setDone(false);
        if (oldState == State.NEEDS_HEADERS) {
            this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.INTERNAL_ERROR, false).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
                try (SafeCloseable ignored = RequestContextUtil.pop();){
                    this.fail(EmptyHttpResponseException.get());
                }
            }));
            this.ctx.flush();
            return;
        }
        if (oldState != State.DONE) {
            HttpHeaders additionalTrailers = this.reqCtx.additionalResponseTrailers();
            if (!additionalTrailers.isEmpty()) {
                this.logBuilder().responseTrailers(additionalTrailers);
                this.responseEncoder.writeTrailers(this.req.id(), this.req.streamId(), additionalTrailers).addListener(this.writeHeadersFutureListener(true));
                this.ctx.flush();
            } else if (this.isWritable()) {
                this.responseEncoder.writeData(this.req.id(), this.req.streamId(), HttpData.empty(), true).addListener(this.writeDataFutureListener(true, true));
                this.ctx.flush();
            } else if (!this.reqCtx.sessionProtocol().isMultiplex()) {
                this.succeed();
            } else {
                this.fail(ClosedStreamException.get());
            }
        }
    }

    private void succeed() {
        if (this.tryComplete(null)) {
            Throwable cause = null;
            RequestLog requestLog = this.reqCtx.log().getIfAvailable(RequestLogProperty.RESPONSE_CAUSE);
            if (requestLog != null) {
                cause = requestLog.responseCause();
            }
            this.endLogRequestAndResponse(cause);
            AccessLogWriterUtil.maybeWriteAccessLog(this.reqCtx);
        }
    }

    @Override
    void fail(Throwable cause) {
        if (this.tryComplete(cause)) {
            this.setDone(true);
            this.endLogRequestAndResponse(cause);
            AccessLogWriterUtil.maybeWriteAccessLog(this.reqCtx);
        }
    }

    private void failAndRespond(Throwable cause, AggregatedHttpResponse res, Http2Error error, boolean cancel) {
        boolean isReset;
        ChannelFuture future;
        State oldState = this.setDone(cancel);
        int id = this.req.id();
        int streamId = this.req.streamId();
        if (oldState == State.NEEDS_HEADERS) {
            future = this.writeAggregatedHttpResponse(res);
            isReset = false;
        } else {
            future = this.responseEncoder.writeReset(id, streamId, error, false);
            isReset = true;
        }
        this.addCallbackAndFlush(cause, oldState, future, isReset);
    }

    private void failAndReset(Throwable cause) {
        State oldState = this.setDone(false);
        ChannelFuture future = this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.CANCEL, false);
        this.addCallbackAndFlush(cause, oldState, future, true);
    }

    private void addCallbackAndFlush(Throwable cause, State oldState, ChannelFuture future, boolean isReset) {
        if (oldState != State.DONE) {
            future.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
                try (SafeCloseable ignored = RequestContextUtil.pop();){
                    if (f.isSuccess() && !isReset) {
                        this.maybeLogFirstResponseBytesTransferred();
                        if (this.req.shouldResetOnlyIfRemoteIsOpen()) {
                            this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.CANCEL, true);
                        }
                    }
                    this.fail(cause);
                }
            }));
        }
        this.ctx.flush();
    }

    WriteHeadersFutureListener writeHeadersFutureListener(boolean endOfStream) {
        if (!endOfStream) {
            if (this.cachedWriteHeadersListener == null) {
                this.cachedWriteHeadersListener = new WriteHeadersFutureListener(false);
            }
            return this.cachedWriteHeadersListener;
        }
        return new WriteHeadersFutureListener(true);
    }

    WriteDataFutureListener writeDataFutureListener(boolean endOfStream, boolean wroteEmptyData) {
        if (!endOfStream && !wroteEmptyData) {
            if (this.cachedWriteDataListener == null) {
                this.cachedWriteDataListener = new WriteDataFutureListener(false, false);
            }
            return this.cachedWriteDataListener;
        }
        return new WriteDataFutureListener(endOfStream, wroteEmptyData);
    }

    void handleWriteComplete(ChannelFuture future, boolean endOfStream, boolean isSuccess) throws Exception {
        if (isSuccess) {
            this.maybeLogFirstResponseBytesTransferred();
            if (endOfStream) {
                this.succeed();
            }
            if (!this.isSubscriptionCompleted) {
                assert (this.subscription != null);
                this.subscription.request(1L);
            }
            return;
        }
        this.fail(future.cause());
        HttpServerHandler.CLOSE_ON_FAILURE.operationComplete(future);
    }

    private void maybeLogFirstResponseBytesTransferred() {
        if (!this.loggedResponseHeadersFirstBytesTransferred) {
            this.loggedResponseHeadersFirstBytesTransferred = true;
            this.logBuilder().responseFirstBytesTransferred();
        }
    }

    static enum State {
        NEEDS_HEADERS,
        NEEDS_DATA,
        NEEDS_DATA_OR_TRAILERS,
        NEEDS_TRAILERS,
        DONE;

    }

    private class WriteHeadersFutureListener
    implements ChannelFutureListener {
        private final boolean endOfStream;

        WriteHeadersFutureListener(boolean endOfStream) {
            this.endOfStream = endOfStream;
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            try (SafeCloseable ignored = RequestContextUtil.pop();){
                AbstractHttpResponseSubscriber.this.handleWriteComplete(future, this.endOfStream, future.isSuccess());
            }
        }
    }

    private class WriteDataFutureListener
    implements ChannelFutureListener {
        private final boolean endOfStream;
        private final boolean wroteEmptyData;

        WriteDataFutureListener(boolean endOfStream, boolean wroteEmptyData) {
            this.endOfStream = endOfStream;
            this.wroteEmptyData = wroteEmptyData;
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            try (SafeCloseable ignored = RequestContextUtil.pop();){
                boolean isSuccess;
                if (future.isSuccess()) {
                    isSuccess = true;
                } else {
                    Throwable cause = future.cause();
                    isSuccess = this.endOfStream && this.wroteEmptyData && AbstractHttpResponseSubscriber.this.responseEncoder instanceof Http1ObjectEncoder && (cause instanceof ClosedChannelException || cause instanceof ClosedSessionException);
                }
                AbstractHttpResponseSubscriber.this.handleWriteComplete(future, this.endOfStream, isSuccess);
            }
        }
    }
}

