/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.rs;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;
import net.pincette.rs.Util;
import net.pincette.util.Pair;
import net.pincette.util.Util;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class InputStreamSubscriber
extends InputStream
implements Subscriber<ByteBuffer> {
    private final long timeout;
    private final Thread thread = Thread.currentThread();
    private ByteBuffer buffer;
    private boolean ended;
    private Throwable exception;
    private Subscription subscription;

    public InputStreamSubscriber() {
        this(5000L);
    }

    public InputStreamSubscriber(long timeout) {
        this.timeout = timeout;
    }

    private Optional<ByteBuffer> getBuffer() {
        if (this.noData()) {
            this.readBuffer();
        }
        return this.ended ? Optional.empty() : Optional.of(this.buffer);
    }

    private boolean noData() {
        return this.subscription == null || this.buffer == null || !this.buffer.hasRemaining();
    }

    @Override
    public void onComplete() {
        this.ended = true;
        LockSupport.unpark(this.thread);
    }

    @Override
    public void onError(Throwable t) {
        this.ended = true;
        this.exception = t;
        LockSupport.unpark(this.thread);
        throw new Util.GeneralException(t);
    }

    @Override
    public void onNext(ByteBuffer buffer) {
        this.buffer = buffer;
        LockSupport.unpark(this.thread);
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        if (subscription != null) {
            subscription.request(1L);
        }
    }

    private void park() {
        while (this.noData()) {
            Util.parking(this, this.timeout);
        }
    }

    @Override
    public int read() throws IOException {
        byte[] b = new byte[1];
        return this.read(b, 0, b.length) == -1 ? -1 : 0xFF & b[0];
    }

    @Override
    public int read(byte[] bytes, int offset, int length) throws IOException {
        if (this.exception != null) {
            throw new IOException(this.exception);
        }
        return this.ended ? -1 : this.readFromBuffer(bytes, offset, length);
    }

    private void readBuffer() {
        if (this.subscription != null) {
            this.subscription.request(1L);
        }
        this.park();
    }

    private int readFromBuffer(byte[] bytes, int offset, int length) {
        return this.getBuffer().map(b -> Pair.pair(b, Math.min(Math.min(length, bytes.length - offset), b.remaining()))).map(pair -> Pair.pair(((ByteBuffer)pair.first).get(bytes, offset, (Integer)pair.second), (Integer)pair.second)).map(pair -> (Integer)pair.second).orElse(-1);
    }
}

