/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.media.common;

import io.helidon.common.http.DataChunk;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

public class DataChunkInputStream
extends InputStream {
    private static final Logger LOGGER = Logger.getLogger(DataChunkInputStream.class.getName());
    private final String originalThreadID;
    private final boolean validate;
    private final Flow.Publisher<DataChunk> originalPublisher;
    private int bufferIndex;
    private CompletableFuture<DataChunk> current = new CompletableFuture();
    private CompletableFuture<DataChunk> next = this.current;
    private volatile Flow.Subscription subscription;
    private byte[] oneByte;
    private final AtomicBoolean subscribed = new AtomicBoolean(false);

    public DataChunkInputStream(Flow.Publisher<DataChunk> originalPublisher) {
        this(originalPublisher, false);
    }

    public DataChunkInputStream(Flow.Publisher<DataChunk> originalPublisher, boolean validate) {
        this.originalPublisher = originalPublisher;
        this.originalThreadID = this.getCurrentThreadIdent();
        this.validate = validate;
    }

    private static void releaseChunk(DataChunk chunk, Throwable th) {
        if (chunk != null && !chunk.isReleased()) {
            LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
            chunk.release();
        }
    }

    @Override
    public void close() {
        Optional.ofNullable(this.current).ifPresent(it -> this.current.whenComplete(DataChunkInputStream::releaseChunk));
        this.current = null;
        this.bufferIndex = 0;
    }

    @Override
    public int read() throws IOException {
        int r;
        if (this.oneByte == null) {
            this.oneByte = new byte[1];
        }
        if ((r = this.read(this.oneByte, 0, 1)) < 0) {
            return r;
        }
        return this.oneByte[0] & 0xFF;
    }

    @Override
    public int read(byte[] buf, int off, int len) throws IOException {
        this.validate();
        if (this.subscribed.compareAndSet(false, true)) {
            this.originalPublisher.subscribe(new DataChunkSubscriber());
        }
        if (this.current == null) {
            throw new IOException("The input stream has been closed");
        }
        try {
            DataChunk chunk = this.current.get();
            if (chunk == null) {
                return -1;
            }
            ByteBuffer[] currentBuffers = chunk.data();
            int count = 0;
            while (this.bufferIndex < currentBuffers.length) {
                int rem;
                int blen;
                if (this.bufferIndex == 0 && currentBuffers[this.bufferIndex].position() == 0) {
                    LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
                }
                if ((blen = len) > (rem = currentBuffers[this.bufferIndex].remaining())) {
                    blen = rem;
                }
                currentBuffers[this.bufferIndex].get(buf, off, blen);
                off += blen;
                count += blen;
                len -= blen;
                if (rem > blen) break;
                if (this.bufferIndex == currentBuffers.length - 1) {
                    DataChunkInputStream.releaseChunk(chunk, null);
                    this.current = this.next;
                    this.bufferIndex = 0;
                    this.subscription.request(1L);
                    break;
                }
                ++this.bufferIndex;
            }
            return count;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            throw new IOException(e.getCause());
        }
    }

    private String getCurrentThreadIdent() {
        Thread thread = Thread.currentThread();
        return thread.getName() + ":" + thread.getId();
    }

    private void validate() {
        if (this.validate && this.originalThreadID.equals(this.getCurrentThreadIdent())) {
            throw new IllegalStateException("DataChunkInputStream needs to be handled in separate thread to prevent deadlock.");
        }
    }

    private class DataChunkSubscriber
    implements Flow.Subscriber<DataChunk> {
        private DataChunkSubscriber() {
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            DataChunkInputStream.this.subscription = subscription;
            subscription.request(1L);
        }

        @Override
        public void onNext(DataChunk item) {
            LOGGER.finest(() -> "Processing chunk: " + item.id());
            if (item.remaining() > 0) {
                CompletableFuture<DataChunk> prev = DataChunkInputStream.this.next;
                DataChunkInputStream.this.next = new CompletableFuture();
                prev.complete(item);
            } else {
                DataChunkInputStream.releaseChunk(item, null);
                DataChunkInputStream.this.subscription.request(1L);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            DataChunkInputStream.this.next.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            DataChunkInputStream.this.next.complete(null);
        }
    }
}

