/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.http.server.undertow.request;

import io.undertow.io.Receiver;
import io.undertow.server.Connectors;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.SameThreadExecutor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import ru.tinkoff.kora.http.server.undertow.request.Operators;

final class UndertowRequestHttpBodySubscription
implements Flow.Subscription,
Receiver.PartialBytesCallback,
Receiver.ErrorCallback {
    static final AtomicLongFieldUpdater<UndertowRequestHttpBodySubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(UndertowRequestHttpBodySubscription.class, "demand");
    volatile long demand = 0L;
    private final Flow.Subscriber<? super ByteBuffer> s;
    private final HttpServerExchange exchange;
    private final Queue<byte[]> prefetchedData;
    private boolean subscribed = false;
    private final AtomicBoolean done = new AtomicBoolean(false);
    private final Lock lock = new ReentrantLock();

    UndertowRequestHttpBodySubscription(Flow.Subscriber<? super ByteBuffer> s, HttpServerExchange exchange, Queue<byte[]> prefetchedData) {
        this.s = s;
        this.exchange = exchange;
        this.prefetchedData = prefetchedData;
        this.exchange.addExchangeCompleteListener((ex, next) -> {
            next.proceed();
            if (this.done.compareAndSet(false, true)) {
                try {
                    this.s.onError(new IllegalStateException("Response send before request body is fully read"));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void request(long n) {
        assert (n > 0L);
        this.lock.lock();
        try {
            while (!this.prefetchedData.isEmpty()) {
                --n;
                try {
                    byte[] bytes = this.prefetchedData.poll();
                    this.s.onNext(ByteBuffer.wrap(bytes));
                }
                catch (Exception bytes) {
                    // empty catch block
                }
                if (n != 0L) continue;
                return;
            }
            long oldDemand = Operators.addCap(REQUESTED, this, n);
            if (oldDemand <= 0L) {
                this.exchange.getConnection().getWorker().execute(() -> {
                    if (this.subscribed) {
                        this.exchange.getRequestReceiver().resume();
                    } else {
                        this.subscribed = true;
                        Connectors.executeRootHandler(ex -> ex.dispatch(SameThreadExecutor.INSTANCE, () -> ex.getRequestReceiver().receivePartialBytes((Receiver.PartialBytesCallback)this, (Receiver.ErrorCallback)this)), (HttpServerExchange)this.exchange);
                    }
                });
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void cancel() {
        this.exchange.getConnection().getWorker().execute(() -> {
            this.exchange.getRequestReceiver().resume();
            this.exchange.getRequestReceiver().receiveFullBytes((exchange, message) -> {}, (exchange, e) -> {});
        });
    }

    public void error(HttpServerExchange exchange, IOException e) {
        exchange.dispatch(SameThreadExecutor.INSTANCE, () -> this.s.onError(e));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(HttpServerExchange exchange, byte[] message, boolean last) {
        this.lock.lock();
        long newDemand = REQUESTED.decrementAndGet(this);
        try {
            if (newDemand <= 0L) {
                exchange.getRequestReceiver().pause();
                exchange.dispatch(SameThreadExecutor.INSTANCE, () -> {
                    if (message.length > 0) {
                        try {
                            this.s.onNext(ByteBuffer.wrap(message));
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                    if (last) {
                        this.done.set(true);
                        try {
                            this.s.onComplete();
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                });
                return;
            }
        }
        finally {
            this.lock.unlock();
        }
        if (message.length > 0) {
            try {
                this.s.onNext(ByteBuffer.wrap(message));
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        if (last) {
            this.done.set(true);
            exchange.dispatch(SameThreadExecutor.INSTANCE, () -> {
                try {
                    this.s.onComplete();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            });
        }
    }
}

