/*
 * Decompiled with CFR 0.152.
 */
package monix.reactive.internal.rstreams;

import monix.execution.Ack;
import monix.execution.Scheduler;
import monix.reactive.observers.Subscriber;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import scala.MatchError;
import scala.Predef$;

public final class SyncSubscriberAsReactiveSubscriber<A>
implements Subscriber<A> {
    private final Subscriber.Sync<A> target;
    private final int requestCount;
    private Subscription subscription;
    private long expectingCount;
    private volatile boolean isCanceled;

    public <A> SyncSubscriberAsReactiveSubscriber(Subscriber.Sync<A> target, int requestCount) {
        this.target = target;
        this.requestCount = requestCount;
        Predef$.MODULE$.require(requestCount > 0, SyncSubscriberAsReactiveSubscriber::$init$$$anonfun$1);
        Scheduler s = target.scheduler();
        this.subscription = null;
        this.expectingCount = 0L;
        this.isCanceled = false;
    }

    public void onSubscribe(Subscription s) {
        if (this.subscription == null && !this.isCanceled) {
            this.subscription = s;
            this.expectingCount = this.requestCount;
            s.request((long)this.requestCount);
        } else {
            s.cancel();
        }
    }

    public void onNext(A elem) {
        if (this.subscription == null) {
            throw new NullPointerException("onSubscription never happened, see rule 2.13 in the Reactive Streams spec");
        }
        if (elem == null) {
            throw new NullPointerException("onNext(null) is forbidden, see rule 2.13 in the Reactive Streams spec");
        }
        if (!this.isCanceled) {
            Ack ack;
            if (this.expectingCount > 0L) {
                --this.expectingCount;
            }
            if (Ack.Continue$.MODULE$.equals(ack = this.target.onNext(elem))) {
                if (this.expectingCount == 0L) {
                    this.expectingCount = this.requestCount;
                    this.subscription.request((long)this.requestCount);
                }
            } else if (Ack.Stop$.MODULE$.equals(ack)) {
                this.isCanceled = true;
                this.subscription.cancel();
            } else {
                throw new MatchError((Object)ack);
            }
        }
    }

    public void onError(Throwable ex) {
        if (ex == null) {
            throw new NullPointerException("onError(null) is forbidden, see rule 2.13 in the Reactive Streams spec");
        }
        if (!this.isCanceled) {
            this.isCanceled = true;
            this.target.onError(ex);
        }
    }

    public void onComplete() {
        if (!this.isCanceled) {
            this.isCanceled = true;
            this.target.onComplete();
        }
    }

    private static final String $init$$$anonfun$1() {
        return "requestCount must be strictly positive, according to the Reactive Streams contract";
    }
}

