/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.reactivestreams;

import io.vlingo.actors.Stoppable;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.common.Scheduler;
import io.vlingo.reactivestreams.ControlledSubscription;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.SubscriptionController;
import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class StreamPublisherDelegate<T>
implements Publisher<T>,
ControlledSubscription<T> {
    private Cancellable cancellable;
    private final PublisherConfiguration configuration;
    private boolean slow;
    private final Source<T> source;
    private final Map<Integer, SubscriptionController<T>> subscriptions;
    private final ControlledSubscription<T> controlledSubscription;
    private final Scheduler scheduler;
    private final Scheduled<Void> scheduled;
    private final Stoppable stoppable;
    private boolean flushed;

    public StreamPublisherDelegate(Source<T> source, PublisherConfiguration configuration, ControlledSubscription<T> controlledSubscription, Scheduler scheduler, Scheduled<Void> scheduled, Stoppable stoppable) {
        this.source = source;
        this.configuration = configuration;
        this.subscriptions = new HashMap<Integer, SubscriptionController<T>>(2);
        this.controlledSubscription = controlledSubscription;
        this.scheduler = scheduler;
        this.scheduled = scheduled;
        this.stoppable = stoppable;
        this.determineIfSlow();
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        this.schedule(true);
        SubscriptionController<T> controller = new SubscriptionController<T>(subscriber, this.controlledSubscription, this.configuration);
        this.subscriptions.putIfAbsent(controller.id(), controller);
        subscriber.onSubscribe(controller);
    }

    @Override
    public void cancel(SubscriptionController<T> controller) {
        controller.cancelSubscription();
        this.subscriptions.remove(controller.id());
    }

    @Override
    public void request(SubscriptionController<T> controller, long maximum) {
        controller.requestFlow(controller.accumulate(maximum));
        this.publish(controller, null);
    }

    void processNext() {
        if (this.subscriptions.isEmpty()) {
            return;
        }
        try {
            this.source.next().andThen(maybeElements -> {
                if (!maybeElements.terminated) {
                    this.publish(maybeElements.values);
                    this.schedule(false);
                    return maybeElements;
                }
                if (this.flush()) {
                    return maybeElements;
                }
                this.completeAll();
                this.stoppable.stop();
                return maybeElements;
            }).andFinally();
        }
        catch (Throwable t) {
            this.publish(t);
        }
    }

    void publish(T elementOrNull) {
        this.subscriptions.values().forEach(controller -> controller.onNext(elementOrNull));
    }

    void publish(SubscriptionController<T> controller, T elementOrNull) {
        controller.onNext(elementOrNull);
    }

    void publish(Throwable cause) {
        this.subscriptions.values().forEach(controller -> controller.onError(cause));
    }

    void stop() {
        this.cancellable.cancel();
        this.completeAll();
    }

    private void completeAll() {
        this.subscriptions.values().forEach(controller -> controller.subscriber().onComplete());
        this.subscriptions.clear();
    }

    private void determineIfSlow() {
        this.slow = (Boolean)this.source.isSlow().await();
    }

    private boolean flush() {
        this.flushed = false;
        this.subscriptions.values().forEach(controller -> {
            if (controller.hasBufferedElements()) {
                controller.onNext(null);
                this.flushed = true;
            }
        });
        return this.flushed;
    }

    private T[] publish(T[] maybeElements) {
        if (maybeElements.length > 0) {
            for (int idx = 0; idx < maybeElements.length; ++idx) {
                this.publish(maybeElements[idx]);
            }
        }
        return maybeElements;
    }

    private void schedule(boolean isSubscribing) {
        if (this.slow) {
            this.cancellable = this.scheduler.scheduleOnce(this.scheduled, null, 0L, (long)this.configuration.probeInterval);
        } else if (isSubscribing && this.cancellable == null) {
            this.cancellable = this.scheduler.schedule(this.scheduled, null, 0L, (long)this.configuration.probeInterval);
        }
    }
}

