/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.http.server.undertow.request;

import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.Connectors;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.SameThreadExecutor;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import org.xnio.channels.StreamSourceChannel;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.util.FlowUtils;
import ru.tinkoff.kora.common.util.flow.ErrorSubscription;
import ru.tinkoff.kora.http.common.body.HttpBodyInput;
import ru.tinkoff.kora.http.server.undertow.request.UndertowRequestHttpBodySubscription;

public final class UndertowRequestHttpBody
implements HttpBodyInput {
    private final Context context;
    private final HttpServerExchange exchange;
    @Nullable
    private Queue<byte[]> prefetchedData;

    public UndertowRequestHttpBody(Context context, HttpServerExchange exchange, @Nullable Queue<byte[]> prefetchedData) {
        this.context = context;
        this.exchange = exchange;
        this.prefetchedData = prefetchedData;
    }

    public long contentLength() {
        String contentLengthStr = this.exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH);
        return contentLengthStr == null ? -1L : Long.parseLong(contentLengthStr);
    }

    @Nullable
    public String contentType() {
        return this.exchange.getRequestHeaders().getFirst(Headers.CONTENT_TYPE);
    }

    public void subscribe(Flow.Subscriber<? super ByteBuffer> s) {
        HttpServerExchange exchange = this.exchange;
        Queue<byte[]> prefetched = this.prefetchedData;
        if (prefetched != null && !prefetched.isEmpty()) {
            UndertowRequestHttpBodySubscription subscription = new UndertowRequestHttpBodySubscription(s, exchange, prefetched);
            this.prefetchedData = null;
            s.onSubscribe(subscription);
            return;
        }
        try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().allocate();){
            ByteBuffer buffer = pooled.getBuffer();
            buffer.clear();
            Connectors.resetRequestChannel((HttpServerExchange)exchange);
            StreamSourceChannel channel = exchange.getRequestChannel();
            Connectors.resetRequestChannel((HttpServerExchange)exchange);
            prefetched = new LinkedList<byte[]>();
            int res = channel.read(buffer);
            if (res == -1) {
                FlowUtils.empty((Context)this.context).subscribe(s);
                return;
            }
            while (res > 0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                buffer.clear();
                prefetched.add(data);
                res = channel.read(buffer);
            }
            UndertowRequestHttpBodySubscription subscription = new UndertowRequestHttpBodySubscription(s, exchange, prefetched);
            s.onSubscribe(subscription);
        }
        catch (IOException e) {
            ErrorSubscription subscription = new ErrorSubscription(s, this.context, (Throwable)e);
            s.onSubscribe((Flow.Subscription)subscription);
        }
    }

    public InputStream asInputStream() {
        if (this.exchange.isInIoThread()) {
            return null;
        }
        if (this.exchange.isBlocking()) {
            return this.exchange.getInputStream();
        }
        this.exchange.startBlocking();
        return this.exchange.getInputStream();
    }

    public void close() {
        this.exchange.getRequestReceiver().receivePartialBytes((exchange, message, last) -> {});
        this.exchange.getRequestReceiver().resume();
    }

    public CompletionStage<ByteBuffer> asBufferStage() {
        HttpServerExchange exchange = this.exchange;
        CompletableFuture<ByteBuffer> future = new CompletableFuture<ByteBuffer>();
        exchange.getRequestReceiver().receiveFullBytes((ex, message) -> ex.dispatch(SameThreadExecutor.INSTANCE, () -> {
            Queue<byte[]> prefetched = this.prefetchedData;
            if (prefetched == null || this.prefetchedData.isEmpty()) {
                future.complete(ByteBuffer.wrap(message));
                return;
            }
            this.prefetchedData = null;
            byte[] result = this.buildArray(prefetched, message);
            future.complete(ByteBuffer.wrap(result));
        }), (ex, error) -> ex.dispatch(SameThreadExecutor.INSTANCE, () -> future.completeExceptionally(error)));
        return future;
    }

    public CompletionStage<byte[]> asArrayStage() {
        HttpServerExchange exchange = this.exchange;
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        exchange.getRequestReceiver().receiveFullBytes((ex, message) -> ex.dispatch(SameThreadExecutor.INSTANCE, () -> {
            Queue<byte[]> prefetched = this.prefetchedData;
            if (prefetched == null || this.prefetchedData.isEmpty()) {
                future.complete(message);
                return;
            }
            this.prefetchedData = null;
            byte[] result = this.buildArray(prefetched, message);
            future.complete(result);
        }), (ex, error) -> ex.dispatch(SameThreadExecutor.INSTANCE, () -> future.completeExceptionally(error)));
        return future;
    }

    private byte[] buildArray(Queue<byte[]> prefetched, byte[] message) {
        ArrayList<byte[]> all = new ArrayList<byte[]>();
        int size = message.length;
        while (!prefetched.isEmpty()) {
            byte[] data = prefetched.poll();
            all.add(data);
            size += data.length;
        }
        all.add(message);
        byte[] result = new byte[size];
        int pos = 0;
        for (byte[] bytes : all) {
            System.arraycopy(bytes, 0, result, pos, bytes.length);
            pos += bytes.length;
        }
        return result;
    }
}

