/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.symbio.store;

import io.vlingo.actors.Stage;
import io.vlingo.reactivestreams.PublisherConfiguration;
import io.vlingo.reactivestreams.Sink;
import io.vlingo.reactivestreams.Source;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.reactivestreams.StreamPublisher;
import io.vlingo.reactivestreams.StreamSubscriber;
import io.vlingo.reactivestreams.Streams;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.store.EntryReader;
import io.vlingo.symbio.store.EntryReaderSource;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class EntryReaderStream<T extends Entry<?>>
implements Stream {
    private final EntryAdapterProvider entryAdapterProvider;
    private long flowElementsRate;
    private final EntryReader<T> entryReader;
    private Source<T> entryReaderSource;
    private Publisher<T> publisher;
    private final Stage stage;
    private EntryStreamSubscriber<T> subscriber;

    public EntryReaderStream(Stage stage, EntryReader<T> entryReader, EntryAdapterProvider entryAdapterProvider) {
        this.stage = stage;
        this.entryReader = entryReader;
        this.entryAdapterProvider = entryAdapterProvider;
    }

    public void request(long flowElementsRate) {
        this.flowElementsRate = flowElementsRate;
        this.subscriber.subscriptionHook.request(this.flowElementsRate);
    }

    public <S> void flowInto(Sink<S> sink) {
        this.flowInto(sink, 100L, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long flowElementsRate) {
        this.flowInto(sink, flowElementsRate, DefaultProbeInterval);
    }

    public <S> void flowInto(Sink<S> sink, long flowElementsRate, int probeInterval) {
        this.flowElementsRate = flowElementsRate;
        PublisherConfiguration configuration = PublisherConfiguration.with((int)probeInterval, (int)-1, (int)256, (Streams.OverflowPolicy)Streams.OverflowPolicy.DropCurrent);
        this.entryReaderSource = (Source)this.stage.actorFor(Source.class, EntryReaderSource.class, new Object[]{this.entryReader, this.entryAdapterProvider, flowElementsRate});
        this.publisher = (Publisher)this.stage.actorFor(Publisher.class, StreamPublisher.class, new Object[]{this.entryReaderSource, configuration});
        Subscriber subscriber = (Subscriber)this.stage.actorFor(Subscriber.class, EntryStreamSubscriber.class, new Object[]{sink, flowElementsRate});
        this.publisher.subscribe(subscriber);
    }

    public void stop() {
        this.subscriber.subscriptionHook.cancel();
    }

    public static class EntryStreamSubscriber<T>
    extends StreamSubscriber<T> {
        Subscription subscriptionHook;

        public EntryStreamSubscriber(Sink<T> sink, long requestThreshold) {
            super(sink, requestThreshold);
        }

        public void onComplete() {
            super.onComplete();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionHook = subscription;
            super.onSubscribe(subscription);
        }
    }
}

