/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.reactivestreams;

import io.vlingo.actors.Logger;
import io.vlingo.reactivestreams.Sink;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class StreamSubscriberDelegate<T>
implements Subscriber<T> {
    private boolean completed;
    private long count;
    private boolean errored;
    private final Logger logger;
    private final long requestThreshold;
    private final Sink<T> sink;
    private Subscription subscription;

    public StreamSubscriberDelegate(Sink<T> sink, long requestThreshold, Logger logger) {
        this.sink = sink;
        this.requestThreshold = requestThreshold;
        this.logger = logger;
        this.count = 0L;
        this.completed = false;
        this.errored = false;
    }

    public void onSubscribe(Subscription subscription) {
        if (this.subscription == null) {
            this.subscription = subscription;
            this.sink.ready();
            this.subscription.request(this.requestThreshold);
        } else {
            subscription.cancel();
        }
    }

    public void onNext(T value) {
        if (!this.isFinalized()) {
            this.sink.whenValue(value);
            if (++this.count >= this.requestThreshold) {
                this.subscription.request(this.requestThreshold);
            }
        }
    }

    public void onComplete() {
        this.completed = true;
        this.terminate();
        this.logger.info("Subscriber with " + this.sink + " is completed.");
    }

    public void onError(Throwable cause) {
        this.errored = true;
        this.terminate();
        this.logger.error("Subscriber with " + this.sink + " is terminating because: " + cause.getMessage(), cause);
    }

    public boolean isFinalized() {
        return this.completed || this.errored;
    }

    public void cancelSubscription() {
        this.subscription.cancel();
    }

    private void terminate() {
        this.sink.terminate();
    }
}

