/*
 * Decompiled with CFR 0.152.
 */
package org.cqfn.rio.channel;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.cqfn.rio.WriteGreed;
import org.cqfn.rio.channel.ChannelSource;
import org.cqfn.rio.channel.WriteRequest;
import org.cqfn.rio.channel.WriteTaskQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class WritableChannelSubscriber
extends CompletableFuture<Void>
implements Subscriber<ByteBuffer> {
    private final ChannelSource<? extends WritableByteChannel> src;
    private final AtomicReference<Subscription> sub;
    private final ExecutorService exec;
    private final WriteGreed greed;
    private WriteTaskQueue queue;

    WritableChannelSubscriber(ChannelSource<? extends WritableByteChannel> src, WriteGreed greed, ExecutorService exec) {
        this.src = src;
        this.sub = new AtomicReference();
        this.exec = exec;
        this.greed = greed;
    }

    public void acceptAsync(Publisher<ByteBuffer> publisher) {
        this.exec.submit(() -> publisher.subscribe((Subscriber)this));
    }

    public void onSubscribe(Subscription subscription) {
        WritableByteChannel chan;
        if (!this.sub.compareAndSet(null, Objects.requireNonNull(subscription))) {
            subscription.cancel();
            return;
        }
        if (this.isCancelled()) {
            subscription.cancel();
            return;
        }
        try {
            chan = this.src.channel();
        }
        catch (IOException iex) {
            subscription.cancel();
            this.completeExceptionally(iex);
            return;
        }
        this.queue = new WriteTaskQueue(this, chan, this.sub, this.greed, this.exec);
        this.queue.accept(new WriteRequest.Init(this));
    }

    public void onNext(ByteBuffer buf) {
        this.queue.accept(new WriteRequest.Next((CompletableFuture<Void>)this, Objects.requireNonNull(buf)));
    }

    public void onError(Throwable err) {
        this.queue.accept(new WriteRequest.Error((CompletableFuture<Void>)this, Objects.requireNonNull(err)));
    }

    public void onComplete() {
        this.queue.accept(new WriteRequest.Complete(this));
    }
}

