/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.symbio.store.state;

import io.vlingo.actors.Stage;
import io.vlingo.common.Completes;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Sink;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.reactivestreams.StreamPublisher;
import io.vlingo.reactivestreams.StreamSubscriber;
import io.vlingo.reactivestreams.Streams;
import io.vlingo.symbio.State;
import io.vlingo.symbio.StateAdapterProvider;
import io.vlingo.symbio.StateBundle;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class StateStream<RS extends State<?>>
implements Stream {
    private long flowElementsRate;
    private Publisher<RS> publisher;
    private final Stage stage;
    private final Map<String, RS> states;
    private final StateAdapterProvider stateAdapterProvider;
    private StateStreamSubscriber<RS> subscriber;

    public StateStream(Stage stage, Map<String, RS> states, StateAdapterProvider stateAdapterProvider) {
        this.stage = stage;
        this.stateAdapterProvider = stateAdapterProvider;
        this.states = states;
    }

    public void request(long flowElementsRate) {
        this.flowElementsRate = flowElementsRate;
        this.subscriber.subscriptionHook.request(this.flowElementsRate);
    }

    public <S> void flowInto(Sink<S> sink) {
        this.flowInto(sink, 100L, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long flowElementsRate) {
        this.flowInto(sink, flowElementsRate, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long flowElementsRate, int probeInterval) {
        this.flowElementsRate = flowElementsRate;
        PublisherConfiguration configuration = PublisherConfiguration.with((int)probeInterval, (int)-1, (int)256, (Streams.OverflowPolicy)Streams.OverflowPolicy.DropCurrent);
        this.publisher = (Publisher)this.stage.actorFor(Publisher.class, StreamPublisher.class, new Object[]{new StateSource<RS>(this.states, this.stateAdapterProvider, flowElementsRate), configuration});
        Subscriber subscriber = (Subscriber)this.stage.actorFor(Subscriber.class, StateStreamSubscriber.class, new Object[]{sink, flowElementsRate, this});
        this.publisher.subscribe(subscriber);
    }

    public void stop() {
        this.subscriber.subscriptionHook.cancel();
    }

    public static class StateStreamSubscriber<RS extends State<?>>
    extends StreamSubscriber<RS> {
        Subscription subscriptionHook;

        public StateStreamSubscriber(Sink<RS> sink, long requestThreshold, StateStream<RS> stateStream) {
            super(sink, requestThreshold);
            ((StateStream)stateStream).subscriber = this;
        }

        public void onComplete() {
            super.onComplete();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionHook = subscription;
            super.onSubscribe(subscription);
        }
    }

    private static final class StateSource<RS extends State<?>>
    implements Source<RS> {
        private final long flowElementsRate;
        private final Iterator<String> iterator;
        private final Map<String, RS> states;
        private final StateAdapterProvider stateAdapterProvider;

        public StateSource(Map<String, RS> states, StateAdapterProvider stateAdapterProvider, long flowElementsRate) {
            this.states = states;
            this.iterator = states.keySet().iterator();
            this.stateAdapterProvider = stateAdapterProvider;
            this.flowElementsRate = flowElementsRate;
        }

        public Completes<Elements<RS>> next() {
            if (this.iterator.hasNext()) {
                int count = 0;
                ArrayList<StateBundle> next = new ArrayList<StateBundle>();
                while (this.iterator.hasNext() && (long)count++ < this.flowElementsRate) {
                    String id = this.iterator.next();
                    State state = (State)this.states.get(id);
                    Object object = this.stateAdapterProvider.fromRaw(state);
                    next.add(new StateBundle(state, object));
                }
                Elements elements = Elements.of((Object[])this.arrayFrom(next));
                return Completes.withSuccess((Object)elements);
            }
            return Completes.withSuccess((Object)Elements.terminated());
        }

        public Completes<Elements<RS>> next(int maximumElements) {
            return this.next();
        }

        public Completes<Elements<RS>> next(long index) {
            return this.next();
        }

        public Completes<Elements<RS>> next(long index, int maximumElements) {
            return this.next();
        }

        public Completes<Boolean> isSlow() {
            return Completes.withSuccess((Object)false);
        }

        private StateBundle[] arrayFrom(List<StateBundle> states) {
            return states.toArray(new StateBundle[states.size()]);
        }
    }
}

