/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.bookie.datainteg;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.shade.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataAsyncIterator {
    private static final Logger log = LoggerFactory.getLogger(MetadataAsyncIterator.class);
    private final Scheduler scheduler;
    private final LedgerManager ledgerManager;
    private final long zkTimeoutMs;
    private final int maxInFlight;

    MetadataAsyncIterator(Scheduler scheduler, LedgerManager ledgerManager, int maxInFlight, int zkTimeout, TimeUnit zkTimeoutUnit) {
        this.scheduler = scheduler;
        this.ledgerManager = ledgerManager;
        this.maxInFlight = maxInFlight;
        this.zkTimeoutMs = zkTimeoutUnit.toMillis(zkTimeout);
    }

    public CompletableFuture<Void> forEach(BiFunction<Long, LedgerMetadata, CompletableFuture<Void>> consumer) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        Disposable disposable = Flowable.generate(() -> new FlatIterator(this.ledgerManager.getLedgerRanges(this.zkTimeoutMs)), (iter, emitter) -> {
            try {
                if (iter.hasNext()) {
                    emitter.onNext((Object)iter.next());
                } else {
                    emitter.onComplete();
                }
            }
            catch (Exception e) {
                emitter.onError((Throwable)e);
            }
        }).subscribeOn(this.scheduler).flatMapCompletable(ledgerId -> Completable.fromCompletionStage(this.processOne((long)ledgerId, consumer)), false, this.maxInFlight).subscribe(() -> promise.complete(null), t -> promise.completeExceptionally(MetadataAsyncIterator.unwrap(t)));
        promise.whenComplete((result, ex) -> disposable.dispose());
        return promise;
    }

    private CompletableFuture<Void> processOne(long ledgerId, BiFunction<Long, LedgerMetadata, CompletableFuture<Void>> consumer) {
        return ((CompletableFuture)((CompletableFuture)this.ledgerManager.readLedgerMetadata(ledgerId).thenApply(Versioned::getValue)).thenCompose(metadata -> (CompletableFuture)consumer.apply(ledgerId, (LedgerMetadata)metadata))).exceptionally(e -> {
            Throwable realException = MetadataAsyncIterator.unwrap(e);
            log.warn("Got exception processing ledger {}", (Object)ledgerId, (Object)realException);
            if (realException instanceof BKException.BKNoSuchLedgerExistsOnMetadataServerException) {
                return null;
            }
            throw new CompletionException(realException);
        });
    }

    static Throwable unwrap(Throwable e) {
        if (e instanceof CompletionException || e instanceof ExecutionException) {
            return MetadataAsyncIterator.unwrap(e.getCause());
        }
        return e;
    }

    private static class FlatIterator {
        final LedgerManager.LedgerRangeIterator ranges;
        Iterator<Long> range = null;

        FlatIterator(LedgerManager.LedgerRangeIterator ranges) {
            this.ranges = ranges;
        }

        boolean hasNext() throws IOException {
            if ((this.range == null || !this.range.hasNext()) && this.ranges.hasNext()) {
                this.range = this.ranges.next().getLedgers().iterator();
            }
            return this.range != null && this.range.hasNext();
        }

        Long next() throws IOException {
            return this.range.next();
        }
    }
}

