/*
 * Decompiled with CFR 0.152.
 */
package org.cqfn.rio.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.cqfn.rio.Buffers;
import org.cqfn.rio.channel.ChannelSource;
import org.cqfn.rio.channel.ReadSubscriberState;
import org.cqfn.rio.channel.ReadSubscription;
import org.cqfn.rio.channel.ReadTaskQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class ReadableChannelPublisher
implements Publisher<ByteBuffer> {
    private static final Subscription DUMMY = new Subscription(){

        public void request(long count) {
        }

        public void cancel() {
        }
    };
    private final ChannelSource<? extends ReadableByteChannel> src;
    private final Buffers buffers;
    private final ExecutorService exec;

    ReadableChannelPublisher(ReadableByteChannel src, Buffers buffers, ExecutorService exec) {
        this(() -> src, buffers, exec);
    }

    ReadableChannelPublisher(ChannelSource<? extends ReadableByteChannel> src, Buffers buffers, ExecutorService exec) {
        this.src = src;
        this.buffers = buffers;
        this.exec = exec;
    }

    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        ReadableByteChannel chan;
        Objects.requireNonNull(subscriber, "Subscriber can't be null");
        try {
            chan = this.src.channel();
        }
        catch (IOException err) {
            subscriber.onSubscribe(DUMMY);
            subscriber.onError((Throwable)err);
            return;
        }
        ReadSubscriberState<? super ByteBuffer> wrap = new ReadSubscriberState<ByteBuffer>(subscriber);
        wrap.onSubscribe(new ReadSubscription(wrap, this.buffers, new ReadTaskQueue(wrap, chan, this.exec)));
    }
}

