/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.http.nio.netty.internal;

import com.typesafe.netty.http.HttpStreamsClientHandler;
import com.typesafe.netty.http.StreamedHttpResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
import software.amazon.awssdk.http.nio.netty.internal.LastHttpContentSwallower;
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription;
import software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.awssdk.utils.async.DelegatingSubscription;

@ChannelHandler.Sharable
@SdkInternalApi
public class ResponseHandler
extends SimpleChannelInboundHandler<HttpObject> {
    private static final Logger log = LoggerFactory.getLogger(ResponseHandler.class);
    private static final ResponseHandler INSTANCE = new ResponseHandler();
    private static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance((String)"KeepAlive");

    private ResponseHandler() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg) throws Exception {
        RequestContext requestContext = (RequestContext)channelContext.channel().attr(ChannelAttributeKey.REQUEST_CONTEXT_KEY).get();
        if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse)msg;
            SdkHttpFullResponse sdkResponse = SdkHttpFullResponse.builder().headers(ResponseHandler.fromNettyHeaders(response.headers())).statusCode(response.status().code()).statusText(response.status().reasonPhrase()).build();
            channelContext.channel().attr(KEEP_ALIVE).set((Object)HttpUtil.isKeepAlive((HttpMessage)response));
            requestContext.handler().onHeaders((SdkHttpResponse)sdkResponse);
        }
        CompletableFuture<Void> ef = ResponseHandler.executeFuture(channelContext);
        if (msg instanceof StreamedHttpResponse) {
            requestContext.handler().onStream((Publisher)new PublisherAdapter((StreamedHttpResponse)msg, channelContext, requestContext, ef));
        } else if (msg instanceof FullHttpResponse) {
            ByteBuf fullContent = null;
            try {
                channelContext.pipeline().replace(HttpStreamsClientHandler.class, channelContext.name() + "-LastHttpContentSwallower", (ChannelHandler)LastHttpContentSwallower.getInstance());
                fullContent = ((FullHttpResponse)msg).content();
                ByteBuffer bb = ResponseHandler.copyToByteBuffer(fullContent);
                requestContext.handler().onStream((Publisher)new FullResponseContentPublisher(channelContext, bb, ef));
                ResponseHandler.finalizeResponse(requestContext, channelContext);
            }
            finally {
                Optional.ofNullable(fullContent).ifPresent(ReferenceCounted::release);
            }
        }
    }

    private static void finalizeResponse(RequestContext requestContext, ChannelHandlerContext channelContext) {
        channelContext.channel().attr(ChannelAttributeKey.RESPONSE_COMPLETE_KEY).set((Object)true);
        ResponseHandler.executeFuture(channelContext).complete(null);
        if (!((Boolean)channelContext.channel().attr(KEEP_ALIVE).get()).booleanValue()) {
            ResponseHandler.closeAndRelease(channelContext);
        } else {
            requestContext.channelPool().release(channelContext.channel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        RequestContext requestContext = (RequestContext)ctx.channel().attr(ChannelAttributeKey.REQUEST_CONTEXT_KEY).get();
        log.debug("Exception processing request: {}", (Object)requestContext.executeRequest().request(), (Object)cause);
        Throwable throwable = this.wrapException(cause);
        ResponseHandler.executeFuture(ctx).completeExceptionally(throwable);
        ResponseHandler.runAndLogError("Fail to execute SdkAsyncHttpResponseHandler#onError", () -> requestContext.handler().onError(throwable));
        ResponseHandler.runAndLogError("Could not release channel back to the pool", () -> ResponseHandler.closeAndRelease(ctx));
    }

    public void channelInactive(ChannelHandlerContext handlerCtx) throws Exception {
        this.notifyIfResponseNotCompleted(handlerCtx);
    }

    public void channelUnregistered(ChannelHandlerContext handlerCtx) throws Exception {
        this.notifyIfResponseNotCompleted(handlerCtx);
    }

    public static ResponseHandler getInstance() {
        return INSTANCE;
    }

    private static void closeAndRelease(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        RequestContext requestContext = (RequestContext)channel.attr(ChannelAttributeKey.REQUEST_CONTEXT_KEY).get();
        channel.close().addListener(i -> requestContext.channelPool().release(channel));
    }

    private static void runAndLogError(String errorMsg, FunctionalUtils.UnsafeRunnable runnable) {
        try {
            runnable.run();
        }
        catch (Exception e) {
            log.error(errorMsg, (Throwable)e);
        }
    }

    private static Map<String, List<String>> fromNettyHeaders(HttpHeaders headers) {
        return headers.entries().stream().collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
    }

    private static ByteBuffer copyToByteBuffer(ByteBuf byteBuf) {
        ByteBuffer bb = ByteBuffer.allocate(byteBuf.readableBytes());
        byteBuf.getBytes(byteBuf.readerIndex(), bb);
        bb.flip();
        return bb;
    }

    private static CompletableFuture<Void> executeFuture(ChannelHandlerContext ctx) {
        return (CompletableFuture)ctx.channel().attr(ChannelAttributeKey.EXECUTE_FUTURE_KEY).get();
    }

    private Throwable wrapException(Throwable originalCause) {
        if (originalCause instanceof ReadTimeoutException) {
            return new IOException("Read timed out", originalCause);
        }
        if (originalCause instanceof WriteTimeoutException) {
            return new IOException("Write timed out", originalCause);
        }
        return originalCause;
    }

    private void notifyIfResponseNotCompleted(ChannelHandlerContext handlerCtx) {
        RequestContext requestCtx = (RequestContext)handlerCtx.channel().attr(ChannelAttributeKey.REQUEST_CONTEXT_KEY).get();
        boolean responseCompleted = (Boolean)handlerCtx.channel().attr(ChannelAttributeKey.RESPONSE_COMPLETE_KEY).get();
        if (!responseCompleted) {
            IOException err = new IOException("Server failed to send complete response");
            ResponseHandler.runAndLogError("Fail to execute SdkAsyncHttpResponseHandler#onError", () -> requestCtx.handler().onError((Throwable)err));
            ResponseHandler.executeFuture(handlerCtx).completeExceptionally(err);
            ResponseHandler.runAndLogError("Could not release channel", () -> ResponseHandler.closeAndRelease(handlerCtx));
        }
    }

    static class FullResponseContentPublisher
    implements Publisher<ByteBuffer> {
        private final ChannelHandlerContext channelContext;
        private final ByteBuffer fullContent;
        private final CompletableFuture<Void> executeFuture;
        private boolean running = true;
        private Subscriber<? super ByteBuffer> subscriber;

        FullResponseContentPublisher(ChannelHandlerContext channelContext, ByteBuffer fullContent, CompletableFuture<Void> executeFuture) {
            this.channelContext = channelContext;
            this.fullContent = fullContent;
            this.executeFuture = executeFuture;
        }

        public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
            if (this.subscriber != null) {
                subscriber.onComplete();
                return;
            }
            this.subscriber = subscriber;
            this.channelContext.channel().attr(ChannelAttributeKey.SUBSCRIBER_KEY).set(subscriber);
            subscriber.onSubscribe(new Subscription(){

                public void request(long l) {
                    if (running) {
                        running = false;
                        if (l <= 0L) {
                            subscriber.onError((Throwable)new IllegalArgumentException("Demand must be positive!"));
                        } else {
                            subscriber.onNext((Object)fullContent);
                            subscriber.onComplete();
                            executeFuture.complete(null);
                        }
                    }
                }

                public void cancel() {
                    running = false;
                }
            });
        }
    }

    private static class OnCancelSubscription
    extends DelegatingSubscription {
        private final Runnable onCancel;

        private OnCancelSubscription(Subscription subscription, Runnable onCancel) {
            super(subscription);
            this.onCancel = onCancel;
        }

        public void cancel() {
            this.onCancel.run();
            super.cancel();
        }
    }

    private static class PublisherAdapter
    implements Publisher<ByteBuffer> {
        private final StreamedHttpResponse response;
        private final ChannelHandlerContext channelContext;
        private final RequestContext requestContext;
        private final CompletableFuture<Void> executeFuture;
        private final AtomicBoolean isDone = new AtomicBoolean(false);

        private PublisherAdapter(StreamedHttpResponse response, ChannelHandlerContext channelContext, RequestContext requestContext, CompletableFuture<Void> executeFuture) {
            this.response = response;
            this.channelContext = channelContext;
            this.requestContext = requestContext;
            this.executeFuture = executeFuture;
        }

        public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
            this.response.subscribe((Subscriber)new Subscriber<HttpContent>(){

                public void onSubscribe(Subscription subscription) {
                    subscriber.onSubscribe((Subscription)new OnCancelSubscription(this.resolveSubscription(subscription), this::onCancel));
                }

                private Subscription resolveSubscription(Subscription subscription) {
                    if (Protocol.HTTP2.equals((Object)ChannelAttributeKey.getProtocolNow(channelContext.channel()))) {
                        return new Http2ResetSendingSubscription(channelContext, subscription);
                    }
                    return subscription;
                }

                private void onCancel() {
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        SdkCancellationException e = new SdkCancellationException("Subscriber cancelled before all events were published");
                        requestContext.handler().onError((Throwable)e);
                        executeFuture.completeExceptionally((Throwable)e);
                    }
                    finally {
                        ResponseHandler.runAndLogError("Could not release channel back to the pool", () -> ResponseHandler.closeAndRelease(channelContext));
                    }
                }

                public void onNext(HttpContent httpContent) {
                    if (isDone.get()) {
                        return;
                    }
                    ByteBuffer byteBuffer = ExceptionHandlingUtils.tryCatchFinally(() -> ResponseHandler.copyToByteBuffer(httpContent.content()), this::onError, () -> ((HttpContent)httpContent).release());
                    if (byteBuffer != null) {
                        ExceptionHandlingUtils.tryCatch(() -> subscriber.onNext((Object)byteBuffer), this::notifyError);
                        ExceptionHandlingUtils.tryCatch(() -> ((ChannelHandlerContext)channelContext).read(), this::onError);
                    }
                }

                public void onError(Throwable t) {
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        ResponseHandler.runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()), () -> subscriber.onError(t));
                        this.notifyError(t);
                    }
                    finally {
                        ResponseHandler.runAndLogError("Could not release channel back to the pool", () -> ResponseHandler.closeAndRelease(channelContext));
                    }
                }

                public void onComplete() {
                    if (!isDone.compareAndSet(false, true)) {
                        return;
                    }
                    try {
                        ResponseHandler.runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()), () -> ((Subscriber)subscriber).onComplete());
                    }
                    finally {
                        ResponseHandler.finalizeResponse(requestContext, channelContext);
                    }
                }

                private void notifyError(Throwable throwable) {
                    SdkAsyncHttpResponseHandler handler = requestContext.handler();
                    ResponseHandler.runAndLogError(String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError.", handler), () -> handler.onError(throwable));
                    executeFuture.completeExceptionally(throwable);
                }
            });
        }
    }
}

