/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.http.server.undertow;

import io.undertow.io.IoCallback;
import io.undertow.io.Sender;
import io.undertow.server.DirectByteBufferDeallocator;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import jakarta.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.IoUtils;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.util.flow.LazySingleSubscription;
import ru.tinkoff.kora.common.util.flow.SingleSubscription;
import ru.tinkoff.kora.http.common.body.HttpBodyOutput;
import ru.tinkoff.kora.http.common.header.HttpHeaders;
import ru.tinkoff.kora.http.common.header.MutableHttpHeaders;
import ru.tinkoff.kora.http.server.common.HttpServer;
import ru.tinkoff.kora.http.server.common.HttpServerResponse;
import ru.tinkoff.kora.http.server.common.router.PublicApiHandler;
import ru.tinkoff.kora.http.server.common.router.PublicApiRequest;
import ru.tinkoff.kora.http.server.common.router.PublicApiResponse;
import ru.tinkoff.kora.http.server.common.telemetry.HttpServerTracer;
import ru.tinkoff.kora.http.server.undertow.UndertowContext;
import ru.tinkoff.kora.http.server.undertow.request.UndertowPublicApiRequest;

public class UndertowExchangeProcessor
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(HttpServer.class);
    private static final Class<?> REACTOR_NON_BLOCKING;
    private static final Class<?> FAST_THREAD_LOCAL;
    private final HttpServerExchange exchange;
    private final PublicApiHandler publicApiHandler;
    private final Context context;
    @Nullable
    private final HttpServerTracer tracer;

    public UndertowExchangeProcessor(HttpServerExchange exchange, PublicApiHandler publicApiHandler, Context context, @Nullable HttpServerTracer tracer) {
        this.exchange = exchange;
        this.publicApiHandler = publicApiHandler;
        this.context = context;
        this.tracer = tracer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        HttpServerExchange exchange = this.exchange;
        Context context = this.context;
        UndertowContext.set(context, exchange);
        context.inject();
        try {
            UndertowPublicApiRequest request = new UndertowPublicApiRequest(exchange, context);
            PublicApiResponse response = this.publicApiHandler.process(context, (PublicApiRequest)request);
            if (response.response().isDone()) {
                try {
                    HttpServerResponse httpResponse = (HttpServerResponse)response.response().join();
                    if (httpResponse == null) {
                        this.sendResponse(exchange, response, HttpServerResponse.of((int)500), new IllegalStateException("Illegal state: response future is empty"));
                    }
                    this.sendResponse(exchange, response, httpResponse, null);
                }
                catch (CompletionException e) {
                    this.sendException(response, Objects.requireNonNullElse(e.getCause(), e));
                }
                catch (Throwable e) {
                    this.sendException(response, e);
                }
                return;
            }
            response.response().whenComplete((httpServerResponse, throwable) -> {
                CompletionException ce;
                if (httpServerResponse != null) {
                    this.sendResponse(exchange, response, (HttpServerResponse)httpServerResponse, null);
                } else if (throwable instanceof CompletionException && (ce = (CompletionException)throwable).getCause() != null) {
                    this.sendException(response, ce.getCause());
                } else if (throwable != null) {
                    this.sendException(response, (Throwable)throwable);
                } else {
                    this.sendResponse(exchange, response, HttpServerResponse.of((int)500), new IllegalStateException("Illegal state: response future is empty"));
                }
            });
        }
        catch (Throwable exception) {
            log.warn("Error dropped", exception);
            exchange.setStatusCode(500);
            exchange.getResponseSender().send(StandardCharsets.UTF_8.encode(Objects.requireNonNullElse(exception.getMessage(), "no message")));
        }
        finally {
            UndertowContext.clear(context);
        }
    }

    private void sendResponse(HttpServerExchange exchange, PublicApiResponse response, HttpServerResponse httpResponse, @Nullable Throwable error) {
        ByteBuffer full;
        MutableHttpHeaders headers = httpResponse.headers();
        exchange.setStatusCode(httpResponse.code());
        HttpServerTracer tracer = this.tracer;
        if (tracer != null) {
            tracer.inject(this.context, (Object)exchange.getResponseHeaders(), (carrier, key, value) -> carrier.add(HttpString.tryFromString((String)key), value));
        }
        exchange.getResponseHeaders().put(Headers.SERVER, "kora/undertow");
        HttpBodyOutput body = httpResponse.body();
        if (body == null) {
            this.setHeaders(exchange.getResponseHeaders(), (HttpHeaders)headers, null);
            exchange.addExchangeCompleteListener((e, nextListener) -> {
                response.closeSendResponseSuccess(e.getStatusCode(), (HttpHeaders)httpResponse.headers(), error);
                nextListener.proceed();
            });
            exchange.endExchange();
            return;
        }
        this.setHeaders(exchange.getResponseHeaders(), (HttpHeaders)headers, body.contentType());
        String contentType = body.contentType();
        if (contentType != null) {
            exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
        }
        if ((full = body.getFullContentIfAvailable()) != null) {
            this.sendFullBody(response, httpResponse, full, error);
            return;
        }
        long contentLength = body.contentLength();
        if (contentLength >= 0L) {
            exchange.setResponseContentLength(contentLength);
        }
        if (this.isInBlockingThread()) {
            if (!exchange.isBlocking()) {
                exchange.startBlocking();
            }
            try (OutputStream os = exchange.getOutputStream();){
                body.write(os);
            }
            catch (IOException e2) {
                response.closeConnectionError(exchange.getStatusCode(), (Throwable)e2);
                if (!exchange.isResponseStarted()) {
                    exchange.setStatusCode(500);
                    exchange.endExchange();
                }
                return;
            }
            catch (Exception e3) {
                response.closeBodyError(exchange.getStatusCode(), (Throwable)e3);
                if (!exchange.isResponseStarted()) {
                    exchange.setStatusCode(500);
                    exchange.endExchange();
                }
                return;
            }
            if (exchange.isComplete()) {
                response.closeSendResponseSuccess(exchange.getStatusCode(), (HttpHeaders)httpResponse.headers(), error);
            }
        } else {
            this.sendStreamingBody(response, (HttpHeaders)headers, body, error);
        }
    }

    private void setHeaders(HeaderMap responseHeaders, HttpHeaders headers, @Nullable String contentType) {
        for (Map.Entry header : headers) {
            String key = (String)header.getKey();
            if (key.equals("server") || key.equals("content-type") && contentType != null || key.equals("content-length") || key.equals("transfer-encoding")) continue;
            responseHeaders.addAll(HttpString.tryFromString((String)key), (Collection)header.getValue());
        }
    }

    private void sendFullBody(final PublicApiResponse response, HttpServerResponse httpResponse, @Nullable ByteBuffer body, final @Nullable Throwable error) {
        HttpServerExchange exchange = this.exchange;
        final MutableHttpHeaders headers = httpResponse.headers();
        if (body == null || body.remaining() == 0) {
            exchange.setResponseContentLength(0L);
            exchange.addExchangeCompleteListener((e, nextListener) -> {
                response.closeSendResponseSuccess(e.getStatusCode(), (HttpHeaders)headers, error);
                nextListener.proceed();
            });
            exchange.endExchange();
        } else {
            exchange.setResponseContentLength((long)body.remaining());
            exchange.getResponseSender().send(body, new IoCallback(){

                public void onComplete(HttpServerExchange exchange, Sender sender) {
                    sender.close(new IoCallback(){

                        public void onComplete(HttpServerExchange exchange, Sender sender) {
                            if (exchange.isComplete()) {
                                response.closeSendResponseSuccess(exchange.getStatusCode(), (HttpHeaders)headers, error);
                            } else {
                                exchange.addExchangeCompleteListener((e, nextListener) -> {
                                    response.closeSendResponseSuccess(e.getStatusCode(), (HttpHeaders)headers, error);
                                    nextListener.proceed();
                                });
                                exchange.endExchange();
                            }
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void onException(HttpServerExchange exchange, Sender sender, IOException exception) {
                            PublicApiResponse publicApiResponse;
                            try {
                                exchange.endExchange();
                                publicApiResponse = response;
                            }
                            catch (Throwable throwable) {
                                response.closeConnectionError(exchange.getStatusCode(), error == null ? error : exception);
                                throw throwable;
                            }
                            publicApiResponse.closeConnectionError(exchange.getStatusCode(), error == null ? error : exception);
                        }
                    });
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void onException(HttpServerExchange exchange, Sender sender, IOException exception) {
                    try {
                        exchange.endExchange();
                    }
                    catch (Throwable throwable) {
                        IoUtils.safeClose((Closeable)exchange.getConnection());
                        response.closeConnectionError(exchange.getStatusCode(), error == null ? error : exception);
                        throw throwable;
                    }
                    IoUtils.safeClose((Closeable)exchange.getConnection());
                    response.closeConnectionError(exchange.getStatusCode(), error == null ? error : exception);
                }
            });
        }
    }

    private void sendException(final PublicApiResponse response, final Throwable error) {
        ByteBuffer full;
        if (!(error instanceof HttpServerResponse)) {
            this.exchange.setStatusCode(500);
            this.exchange.getResponseSender().send(Objects.requireNonNullElse(error.getMessage(), "Unknown error"), StandardCharsets.UTF_8, new IoCallback(){

                public void onComplete(HttpServerExchange exchange, Sender sender) {
                    response.closeSendResponseSuccess(500, null, error);
                    IoCallback.END_EXCHANGE.onComplete(exchange, sender);
                }

                public void onException(HttpServerExchange exchange, Sender sender, IOException exception) {
                    error.addSuppressed(exception);
                    response.closeConnectionError(500, error);
                    IoCallback.END_EXCHANGE.onException(exchange, sender, exception);
                }
            });
            return;
        }
        HttpServerResponse rs = (HttpServerResponse)error;
        this.exchange.setStatusCode(rs.code());
        HttpBodyOutput body = rs.body();
        if (body == null) {
            this.setHeaders(this.exchange.getRequestHeaders(), (HttpHeaders)rs.headers(), null);
            this.exchange.addExchangeCompleteListener((exchange, nextListener) -> {
                response.closeSendResponseSuccess(exchange.getStatusCode(), (HttpHeaders)rs.headers(), error);
                nextListener.proceed();
            });
            this.exchange.setResponseContentLength(0L);
            this.exchange.endExchange();
            return;
        }
        this.setHeaders(this.exchange.getResponseHeaders(), (HttpHeaders)rs.headers(), body.contentType());
        String contentType = body.contentType();
        if (contentType != null) {
            this.exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, body.contentType());
        }
        if ((full = body.getFullContentIfAvailable()) != null) {
            this.sendFullBody(response, rs, full, error);
            return;
        }
        if (this.isInBlockingThread()) {
            if (!this.exchange.isBlocking()) {
                this.exchange.startBlocking();
            }
            try (OutputStream os = this.exchange.getOutputStream();){
                body.write(os);
            }
            catch (IOException e) {
                if (!this.exchange.isResponseStarted()) {
                    this.exchange.setStatusCode(500);
                } else {
                    try {
                        this.exchange.getConnection().close();
                    }
                    catch (IOException ex) {
                        e.addSuppressed(ex);
                    }
                }
                response.closeBodyError(this.exchange.getStatusCode(), (Throwable)e);
                this.exchange.endExchange();
                return;
            }
            response.closeSendResponseSuccess(this.exchange.getStatusCode(), (HttpHeaders)rs.headers(), error);
            return;
        }
        this.sendStreamingBody(response, HttpHeaders.empty(), body, error);
    }

    private boolean isInBlockingThread() {
        return !this.isInIoThread();
    }

    private boolean isInIoThread() {
        Thread t = Thread.currentThread();
        if (this.exchange.getIoThread() == t) {
            return true;
        }
        if (REACTOR_NON_BLOCKING != null && REACTOR_NON_BLOCKING.isInstance(t)) {
            return true;
        }
        return FAST_THREAD_LOCAL != null && FAST_THREAD_LOCAL.isInstance(t);
    }

    private void sendStreamingBody(PublicApiResponse response, HttpHeaders headers, HttpBodyOutput body, @Nullable Throwable error) {
        body.subscribe((Flow.Subscriber)new HttpResponseBodySubscriber(this.exchange, response, headers, error));
    }

    static {
        Class<?> reactorNonBlocking = null;
        Class<?> fastThreadLocal = null;
        try {
            reactorNonBlocking = Thread.currentThread().getContextClassLoader().loadClass("reactor.core.scheduler.NonBlocking");
        }
        catch (ClassNotFoundException classNotFoundException) {
            // empty catch block
        }
        try {
            fastThreadLocal = Thread.currentThread().getContextClassLoader().loadClass("io.netty.util.concurrent.FastThreadLocalThread");
        }
        catch (ClassNotFoundException classNotFoundException) {
            // empty catch block
        }
        REACTOR_NON_BLOCKING = reactorNonBlocking;
        FAST_THREAD_LOCAL = fastThreadLocal;
    }

    private static class HttpResponseBodySubscriber
    implements Flow.Subscriber<ByteBuffer> {
        private final HttpServerExchange exchange;
        private final PublicApiResponse response;
        private final HttpHeaders headers;
        private final Throwable error;
        private volatile Flow.Subscription subscription;
        private final AtomicInteger state = new AtomicInteger(0);

        private HttpResponseBodySubscriber(HttpServerExchange exchange, PublicApiResponse response, HttpHeaders headers, @Nullable Throwable error) {
            this.exchange = exchange;
            this.response = response;
            this.headers = headers;
            this.error = error;
        }

        @Override
        public void onSubscribe(Flow.Subscription s) {
            this.subscription = s;
            s.request(1L);
        }

        @Override
        public void onNext(final ByteBuffer byteBuffer) {
            int newState = this.state.incrementAndGet();
            if ((newState & 0x1000000) != 0) {
                DirectByteBufferDeallocator.free((ByteBuffer)byteBuffer);
                return;
            }
            if (this.subscription instanceof SingleSubscription || this.subscription instanceof LazySingleSubscription) {
                this.exchange.setResponseContentLength((long)byteBuffer.remaining());
                this.exchange.getResponseSender().send(byteBuffer, new IoCallback(){

                    public void onComplete(HttpServerExchange exchange, Sender sender) {
                        if (exchange.isComplete()) {
                            response.closeSendResponseSuccess(exchange.getStatusCode(), headers, error);
                        } else {
                            exchange.addExchangeCompleteListener((e, nextListener) -> {
                                response.closeSendResponseSuccess(e.getStatusCode(), headers, error);
                                nextListener.proceed();
                            });
                            exchange.endExchange();
                        }
                    }

                    public void onException(HttpServerExchange exchange, Sender sender, IOException exception) {
                        response.closeBodyError(exchange.getStatusCode(), (Throwable)exception);
                    }
                });
                return;
            }
            this.exchange.getResponseSender().send(byteBuffer, new IoCallback(){

                public void onComplete(HttpServerExchange exchange, Sender sender) {
                    int newState = state.decrementAndGet();
                    DirectByteBufferDeallocator.free((ByteBuffer)byteBuffer);
                    if ((newState & 0x1000000) != 0) {
                        exchange.addExchangeCompleteListener((ex, nextListener) -> {
                            response.closeSendResponseSuccess(ex.getStatusCode(), headers, null);
                            nextListener.proceed();
                        });
                        exchange.endExchange();
                    } else {
                        subscription.request(1L);
                    }
                }

                public void onException(HttpServerExchange exchange, Sender sender, IOException exception) {
                    DirectByteBufferDeallocator.free((ByteBuffer)byteBuffer);
                    subscription.cancel();
                    exchange.getResponseSender().close();
                    response.closeConnectionError(exchange.getStatusCode(), error == null ? exception : error);
                }
            });
        }

        @Override
        public void onError(Throwable t) {
            HttpServerExchange exchange = this.exchange;
            if (exchange.isResponseStarted()) {
                exchange.getResponseSender().close();
                this.response.closeBodyError(exchange.getStatusCode(), this.error == null ? t : this.error);
                exchange.endExchange();
            } else {
                exchange.setStatusCode(500);
                exchange.getResponseHeaders().remove(Headers.CONTENT_LENGTH);
                exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
                exchange.getResponseSender().send(t.getMessage());
                exchange.endExchange();
                this.response.closeBodyError(exchange.getStatusCode(), this.error == null ? t : this.error);
            }
        }

        @Override
        public void onComplete() {
            if (this.subscription instanceof SingleSubscription || this.subscription instanceof LazySingleSubscription) {
                return;
            }
            int newState = this.state.updateAndGet(oldState -> oldState | 0x1000000);
            if (newState == 0x1000000) {
                this.exchange.addExchangeCompleteListener((exchange, nextListener) -> {
                    this.response.closeSendResponseSuccess(exchange.getStatusCode(), null, this.error);
                    nextListener.proceed();
                });
                this.exchange.endExchange();
            }
        }
    }
}

