/*
 * Decompiled with CFR 0.152.
 */
package com.apple.foundationdb.record.cursors;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.MoreAsyncUtil;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorVisitor;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

@API(value=API.Status.UNSTABLE)
public class MapPipelinedCursor<T, V>
implements RecordCursor<V> {
    private static final CompletableFuture<Boolean> ALREADY_CANCELLED = MoreAsyncUtil.alreadyCancelled();
    @Nonnull
    private final RecordCursor<T> inner;
    @Nonnull
    private final Function<T, CompletableFuture<V>> func;
    private final int pipelineSize;
    @Nonnull
    private final Queue<CompletableFuture<RecordCursorResult<V>>> pipeline;
    private boolean innerExhausted = false;
    private volatile boolean closed = false;
    @Nullable
    private CompletableFuture<RecordCursorResult<T>> waitInnerFuture = null;
    @Nullable
    private RecordCursorResult<V> nextResult = null;

    public MapPipelinedCursor(@Nonnull RecordCursor<T> inner, @Nonnull Function<T, CompletableFuture<V>> func, int pipelineSize) {
        this.inner = inner;
        this.func = func;
        this.pipelineSize = pipelineSize;
        this.pipeline = new ArrayDeque<CompletableFuture<RecordCursorResult<V>>>(pipelineSize);
    }

    @Override
    @Nonnull
    public CompletableFuture<RecordCursorResult<V>> onNext() {
        if (this.nextResult != null && !this.nextResult.hasNext()) {
            return CompletableFuture.completedFuture(this.nextResult);
        }
        return ((CompletableFuture)AsyncUtil.whileTrue(this::tryToFillPipeline, this.getExecutor()).thenCompose(vignore -> this.pipeline.peek())).thenApply(result -> {
            if (result.hasNext()) {
                this.pipeline.remove();
            }
            this.nextResult = result;
            return result;
        });
    }

    @Override
    public void close() {
        this.closed = true;
        this.inner.close();
    }

    @Override
    public boolean isClosed() {
        return this.closed;
    }

    @Override
    @Nonnull
    public Executor getExecutor() {
        return this.inner.getExecutor();
    }

    @Override
    public boolean accept(@Nonnull RecordCursorVisitor visitor) {
        if (visitor.visitEnter(this)) {
            this.inner.accept(visitor);
        }
        return visitor.visitLeave(this);
    }

    protected CompletableFuture<Boolean> tryToFillPipeline() {
        if (this.closed) {
            return this.cancellAll();
        }
        while (!this.innerExhausted && this.pipeline.size() < this.pipelineSize) {
            if (this.closed) {
                return this.cancellAll();
            }
            if (this.waitInnerFuture == null) {
                this.waitInnerFuture = this.inner.onNext();
            }
            if (!this.waitInnerFuture.isDone()) {
                CompletableFuture nextEntry = this.pipeline.peek();
                if (nextEntry == null) {
                    return this.waitInnerFuture.thenApply(vignore -> true);
                }
                return CompletableFuture.anyOf(this.waitInnerFuture, nextEntry).thenApply(vignore -> !nextEntry.isDone());
            }
            RecordCursorResult<T> innerResult = this.waitInnerFuture.join();
            this.pipeline.add(innerResult.mapAsync(this.func));
            if (innerResult.hasNext()) {
                this.waitInnerFuture = null;
                if (!this.pipeline.peek().isDone()) continue;
                return AsyncUtil.READY_FALSE;
            }
            this.innerExhausted = true;
            if (innerResult.getNoNextReason() != RecordCursor.NoNextReason.TIME_LIMIT_REACHED || this.nextResult == null) break;
            RecordCursorContinuation lastFinishedContinuation = this.cancelPendingFutures();
            this.pipeline.add(CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(lastFinishedContinuation, RecordCursor.NoNextReason.TIME_LIMIT_REACHED)));
            break;
        }
        return this.pipeline.peek().thenApply(vignore -> false);
    }

    @Nonnull
    private CompletableFuture<Boolean> cancellAll() {
        while (!this.pipeline.isEmpty()) {
            this.pipeline.remove().cancel(false);
        }
        return ALREADY_CANCELLED;
    }

    @Nonnull
    private RecordCursorContinuation cancelPendingFutures() {
        Iterator iter = this.pipeline.iterator();
        RecordCursorContinuation continuation = this.nextResult.getContinuation();
        while (iter.hasNext()) {
            CompletableFuture pendingEntry = (CompletableFuture)iter.next();
            if (!pendingEntry.isDone()) {
                while (true) {
                    iter.remove();
                    pendingEntry.cancel(false);
                    if (!iter.hasNext()) {
                        return continuation;
                    }
                    pendingEntry = (CompletableFuture)iter.next();
                }
            }
            continuation = ((RecordCursorResult)pendingEntry.join()).getContinuation();
        }
        return continuation;
    }
}

