/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.symbio.store;

import io.vlingo.actors.Actor;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduled;
import io.vlingo.reactivestreams.Elements;
import io.vlingo.reactivestreams.Stream;
import io.vlingo.symbio.BaseEntry;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.EntryAdapterProvider;
import io.vlingo.symbio.EntryBundle;
import io.vlingo.symbio.Source;
import io.vlingo.symbio.store.EntryReader;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;

public class EntryReaderSource<T extends Entry<?>>
extends Actor
implements io.vlingo.reactivestreams.Source<T>,
Scheduled<Object> {
    private final Deque<T> cache;
    private final Cancellable cancellable;
    private final EntryReader<T> entryReader;
    private final long flowElementsRate;
    private boolean reading;
    private final EntryAdapterProvider entryAdapterProvider;

    public EntryReaderSource(EntryReader<T> entryReader, EntryAdapterProvider entryAdapterProvider, long flowElementsRate) {
        this.entryReader = entryReader;
        this.entryAdapterProvider = entryAdapterProvider;
        this.flowElementsRate = flowElementsRate;
        this.cache = new ArrayDeque<T>();
        this.cancellable = this.scheduler().schedule((Scheduled)this.selfAs(Scheduled.class), null, 0L, (long)Stream.FastProbeInterval);
    }

    public Completes<Elements<T>> next() {
        if (!this.cache.isEmpty()) {
            ArrayList<EntryBundle> next = new ArrayList<EntryBundle>();
            int index = 0;
            while ((long)index < this.flowElementsRate && !this.cache.isEmpty()) {
                Entry entry = (Entry)this.cache.poll();
                Entry normalized = entry instanceof BaseEntry ? entry : entry.withId(entry.id());
                Object source = this.entryAdapterProvider.asSource(normalized);
                next.add(new EntryBundle(entry, (Source<?>)source));
                ++index;
            }
            Elements elements = Elements.of((Object[])this.arrayFrom(next));
            return this.completes().with((Object)elements);
        }
        return this.completes().with((Object)Elements.empty());
    }

    public Completes<Elements<T>> next(int maximumElements) {
        return this.next();
    }

    public Completes<Elements<T>> next(long index) {
        return this.next();
    }

    public Completes<Elements<T>> next(long index, int maximumElements) {
        return this.next();
    }

    public Completes<Boolean> isSlow() {
        return this.completes().with((Object)false);
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object data) {
        if (this.cache.isEmpty() && !this.reading) {
            this.reading = true;
            int max = this.flowElementsRate > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)this.flowElementsRate;
            this.entryReader.readNext(max).andThenConsume(entries -> {
                this.cache.addAll((Collection<T>)entries);
                this.reading = false;
            });
        }
    }

    public void stop() {
        this.cancellable.cancel();
        super.stop();
    }

    private EntryBundle[] arrayFrom(List<EntryBundle> sources) {
        return sources.toArray(new EntryBundle[sources.size()]);
    }
}

