/*
 * Decompiled with CFR 0.152.
 */
package io.pravega.segmentstore.server.reading;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.segmentstore.contracts.ReadResult;
import io.pravega.segmentstore.contracts.ReadResultEntry;
import io.pravega.segmentstore.contracts.ReadResultEntryContents;
import io.pravega.segmentstore.contracts.ReadResultEntryType;
import io.pravega.segmentstore.server.reading.AsyncReadResultHandler;
import java.beans.ConstructorProperties;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public class AsyncReadResultProcessor
implements AutoCloseable {
    private final ReadResult readResult;
    private final AsyncReadResultHandler entryHandler;
    private final AtomicBoolean closed;

    private AsyncReadResultProcessor(ReadResult readResult, AsyncReadResultHandler entryHandler) {
        Preconditions.checkNotNull((Object)readResult, (Object)"readResult");
        Preconditions.checkNotNull((Object)entryHandler, (Object)"entryHandler");
        this.readResult = readResult;
        this.entryHandler = entryHandler;
        this.closed = new AtomicBoolean();
    }

    public static AsyncReadResultProcessor process(ReadResult readResult, AsyncReadResultHandler entryHandler, Executor executor) {
        Preconditions.checkNotNull((Object)executor, (Object)"executor");
        AsyncReadResultProcessor processor = new AsyncReadResultProcessor(readResult, entryHandler);
        processor.processResult(executor);
        return processor;
    }

    public static CompletableFuture<InputStream> processAll(ReadResult readResult, Executor executor, Duration requestContentTimeout) {
        ProcessAllHandler handler = new ProcessAllHandler(requestContentTimeout);
        AsyncReadResultProcessor.process(readResult, handler, executor);
        return handler.result;
    }

    @Override
    public void close() {
        this.close(null);
    }

    private void close(Throwable failureCause) {
        if (!this.closed.getAndSet(true)) {
            this.readResult.close();
            if (failureCause == null) {
                this.entryHandler.processResultComplete();
            } else {
                this.entryHandler.processError(Exceptions.unwrap((Throwable)failureCause));
            }
        }
    }

    private void processResult(Executor executor) {
        AtomicBoolean shouldContinue = new AtomicBoolean(true);
        Futures.loop(() -> !this.closed.get() && shouldContinue.get(), () -> {
            CompletableFuture<ReadResultEntry> resultEntryFuture = this.fetchNextEntry();
            shouldContinue.set(resultEntryFuture != null);
            return resultEntryFuture != null ? resultEntryFuture : CompletableFuture.completedFuture(null);
        }, resultEntry -> {
            if (resultEntry != null) {
                shouldContinue.set(this.entryHandler.processEntry((ReadResultEntry)resultEntry));
            }
        }, (Executor)executor).whenComplete((r, ex) -> this.close((Throwable)ex));
    }

    private CompletableFuture<ReadResultEntry> fetchNextEntry() {
        ReadResultEntry currentEntry = (ReadResultEntry)this.readResult.next();
        if (currentEntry != null && currentEntry.getType() != ReadResultEntryType.EndOfStreamSegment) {
            CompletableFuture entryContentsFuture = currentEntry.getContent();
            if (entryContentsFuture.isDone()) {
                return CompletableFuture.completedFuture(currentEntry);
            }
            if (this.entryHandler.shouldRequestContents(currentEntry.getType(), currentEntry.getStreamSegmentOffset())) {
                currentEntry.requestContent(this.entryHandler.getRequestContentTimeout());
                return entryContentsFuture.thenApply(v -> currentEntry);
            }
        }
        return null;
    }

    private static class ProcessAllHandler
    implements AsyncReadResultHandler {
        private final Duration requestContentTimeout;
        private final List<InputStream> parts = Collections.synchronizedList(new ArrayList());
        private final CompletableFuture<InputStream> result = new CompletableFuture();

        @Override
        public boolean shouldRequestContents(ReadResultEntryType entryType, long streamSegmentOffset) {
            return true;
        }

        @Override
        public boolean processEntry(ReadResultEntry entry) {
            this.parts.add(((ReadResultEntryContents)entry.getContent().join()).getData());
            return true;
        }

        @Override
        public void processError(Throwable cause) {
            this.result.completeExceptionally(cause);
        }

        @Override
        public void processResultComplete() {
            this.result.complete(new SequenceInputStream(Iterators.asEnumeration(this.parts.iterator())));
        }

        @ConstructorProperties(value={"requestContentTimeout"})
        @SuppressFBWarnings(justification="generated code")
        public ProcessAllHandler(Duration requestContentTimeout) {
            this.requestContentTimeout = requestContentTimeout;
        }

        @Override
        @SuppressFBWarnings(justification="generated code")
        public Duration getRequestContentTimeout() {
            return this.requestContentTimeout;
        }
    }
}

