/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.rs;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.LockSupport;
import net.pincette.rs.Util;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class OutputStreamPublisher
extends OutputStream
implements Publisher<ByteBuffer> {
    private final Thread thread = Thread.currentThread();
    private final long timeout;
    private long requested;
    private Subscriber<? super ByteBuffer> subscriber;

    public OutputStreamPublisher() {
        this(5000L);
    }

    public OutputStreamPublisher(long timeout) {
        this.timeout = timeout;
    }

    @Override
    public void close() {
        if (this.subscriber != null) {
            this.subscriber.onComplete();
        }
    }

    private void park() {
        while (!this.requested()) {
            Util.parking(this, this.timeout);
        }
    }

    private boolean requested() {
        return this.subscriber != null && this.requested > 0L;
    }

    @Override
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        this.subscriber = subscriber;
        if (subscriber != null) {
            subscriber.onSubscribe(new StreamSubscription());
        }
    }

    @Override
    public void write(byte[] bytes) {
        this.write(bytes, 0, bytes.length);
    }

    @Override
    public void write(byte[] bytes, int offset, int length) {
        if (!this.requested()) {
            this.park();
        }
        --this.requested;
        this.subscriber.onNext(ByteBuffer.wrap(bytes, offset, length));
    }

    @Override
    public void write(int b) {
        this.write(new byte[]{(byte)b}, 0, 1);
    }

    private class StreamSubscription
    implements Subscription {
        private StreamSubscription() {
        }

        @Override
        public void cancel() {
        }

        @Override
        public void request(long n) {
            OutputStreamPublisher.this.requested += n;
            LockSupport.unpark(OutputStreamPublisher.this.thread);
        }
    }
}

