/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.reactivestreams;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Stoppable;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduled;
import io.vlingo.reactivestreams.ControlledSubscription;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.Operator;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.StreamPublisherDelegate;
import io.vlingo.reactivestreams.StreamSubscriberDelegate;
import io.vlingo.reactivestreams.SubscriptionController;
import io.vlingo.reactivestreams.sink.ConsumerSink;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Consumer;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class StreamProcessor<T, R>
extends Actor
implements Processor<T, R>,
ControlledSubscription<R>,
Scheduled<Void>,
Stoppable {
    private final StreamPublisherDelegate<R> publisherDelegate;
    private final PublisherSource publisherSource;
    private final long requestThreshold;
    private final StreamSubscriberDelegate<T> subscriberDelegate;

    public StreamProcessor(Operator<T, R> operator, long requestThreshold, PublisherConfiguration configuration) {
        this.requestThreshold = requestThreshold;
        this.subscriberDelegate = new StreamSubscriberDelegate(new ConsumerSink(new ConsumerOperator(operator)), requestThreshold, this.logger());
        this.publisherSource = new PublisherSource();
        this.publisherDelegate = new StreamPublisherDelegate(this.publisherSource, configuration, (ControlledSubscription)this.selfAs(ControlledSubscription.class), this.scheduler(), (Scheduled<Void>)((Scheduled)this.selfAs(Scheduled.class)), (Stoppable)this.selfAs(Stoppable.class));
    }

    public void onSubscribe(Subscription subscription) {
        this.subscriberDelegate.onSubscribe(subscription);
    }

    public void onNext(T value) {
        this.subscriberDelegate.onNext(value);
    }

    public void onComplete() {
        this.subscriberDelegate.onComplete();
        this.publisherSource.termiante();
    }

    public void onError(Throwable cause) {
        this.publisherDelegate.publish(cause);
        this.subscriberDelegate.onError(cause);
        this.publisherSource.termiante();
    }

    public void subscribe(Subscriber<? super R> subscriber) {
        this.publisherDelegate.subscribe(subscriber);
    }

    public void intervalSignal(Scheduled<Void> scheduled, Void data) {
        this.publisherDelegate.processNext();
    }

    @Override
    public void cancel(SubscriptionController<R> controller) {
        this.subscriberDelegate.cancelSubscription();
        this.publisherDelegate.cancel(controller);
    }

    @Override
    public void request(SubscriptionController<R> controller, long maximum) {
        this.publisherDelegate.request(controller, maximum);
    }

    public void stop() {
        this.publisherSource.termiante();
        super.stop();
    }

    private class PublisherSource
    implements Source<R> {
        private boolean terminated = false;
        private final Queue<R> values = new ArrayDeque();

        PublisherSource() {
        }

        @Override
        public Completes<Elements<R>> next() {
            return this.next((int)StreamProcessor.this.requestThreshold);
        }

        @Override
        public Completes<Elements<R>> next(int maximumElements) {
            if (this.values.isEmpty()) {
                if (StreamProcessor.this.subscriberDelegate.isFinalized() || this.terminated) {
                    return Completes.withSuccess(Elements.terminated());
                }
                return Completes.withSuccess(Elements.empty());
            }
            return Completes.withSuccess(Elements.of(this.nextValues(maximumElements)));
        }

        @Override
        public Completes<Elements<R>> next(long index) {
            return this.next((int)StreamProcessor.this.requestThreshold);
        }

        @Override
        public Completes<Elements<R>> next(long index, int maximumElements) {
            return this.next(maximumElements);
        }

        @Override
        public Completes<Boolean> isSlow() {
            return Completes.withSuccess((Object)false);
        }

        void enqueue(R value) {
            this.values.add(value);
        }

        void termiante() {
            this.terminated = true;
        }

        private R[] nextValues(long maximum) {
            long elements = Math.min((long)this.values.size(), maximum);
            Object[] nextValues = new Object[(int)elements];
            for (int idx = 0; idx < nextValues.length; ++idx) {
                nextValues[idx] = this.values.poll();
            }
            return nextValues;
        }
    }

    private class ConsumerOperator
    implements Consumer<T> {
        private final Operator<T, R> operator;

        ConsumerOperator(Operator<T, R> operator) {
            this.operator = operator;
        }

        @Override
        public void accept(T value) {
            try {
                this.operator.performInto(value, transformed -> StreamProcessor.this.publisherSource.enqueue(transformed));
            }
            catch (Exception e) {
                StreamProcessor.this.publisherDelegate.publish(e);
            }
        }
    }
}

