/*
 * Decompiled with CFR 0.152.
 */
package com.apple.foundationdb.record.provider.foundationdb.runners.throttled;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.async.MoreAsyncUtil;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContextConfig;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.runners.FutureAutoClose;
import com.apple.foundationdb.record.provider.foundationdb.runners.TransactionalRunner;
import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.CursorFactory;
import com.apple.foundationdb.record.provider.foundationdb.runners.throttled.ItemHandler;
import com.apple.foundationdb.util.CloseException;
import com.apple.foundationdb.util.CloseableUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(value=API.Status.EXPERIMENTAL)
public class ThrottledRetryingIterator<T>
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ThrottledRetryingIterator.class);
    public static final int NUMBER_OF_RETRIES = 100;
    private static final int SUCCESS_INCREASE_THRESHOLD = 40;
    @Nonnull
    private final TransactionalRunner transactionalRunner;
    @Nonnull
    private final Executor executor;
    @Nonnull
    private final ScheduledExecutorService scheduledExecutor;
    @Nonnull
    private final FutureAutoClose futureManager;
    private final int transactionTimeQuotaMillis;
    private final int maxRecordDeletesPerTransaction;
    private final int maxRecordScannedPerSec;
    private final int maxRecordDeletesPerSec;
    @Nonnull
    private final CursorFactory<T> cursorCreator;
    @Nonnull
    private final ItemHandler<T> singleItemHandler;
    @Nullable
    private final Consumer<QuotaManager> transactionSuccessNotification;
    @Nullable
    private final Consumer<QuotaManager> transactionInitNotification;
    private final int numOfRetries;
    private boolean closed = false;
    private long rangeIterationStartTimeMilliseconds = 0L;
    private int cursorRowsLimit;
    private int failureRetriesCounter = 0;
    private int successCounter = 0;

    public ThrottledRetryingIterator(Builder<T> builder) {
        this.transactionalRunner = builder.transactionalRunner;
        this.executor = builder.executor;
        this.scheduledExecutor = builder.scheduledExecutor;
        this.cursorCreator = builder.cursorCreator;
        this.singleItemHandler = builder.singleItemHandler;
        this.transactionTimeQuotaMillis = builder.transactionTimeQuotaMillis;
        this.maxRecordDeletesPerTransaction = builder.maxRecordDeletesPerTransaction;
        this.maxRecordScannedPerSec = builder.maxRecordScannedPerSec;
        this.maxRecordDeletesPerSec = builder.maxRecordDeletesPerSec;
        this.transactionSuccessNotification = builder.transactionSuccessNotification;
        this.transactionInitNotification = builder.transactionInitNotification;
        this.cursorRowsLimit = 0;
        this.numOfRetries = builder.numOfRetries;
        this.futureManager = new FutureAutoClose();
    }

    public CompletableFuture<Void> iterateAll(FDBRecordStore.Builder storeBuilder) {
        if (this.closed) {
            return CompletableFuture.failedFuture(new FDBDatabaseRunner.RunnerClosed());
        }
        AtomicReference<Object> lastSuccessCont = new AtomicReference<Object>(null);
        QuotaManager singleIterationQuotaManager = new QuotaManager();
        return AsyncUtil.whileTrue(() -> ((CompletableFuture)this.iterateOneRange(storeBuilder, (RecordCursorResult)lastSuccessCont.get(), singleIterationQuotaManager).handle((continuation, ex) -> {
            if (ex == null) {
                lastSuccessCont.set(continuation);
                return this.handleSuccess(singleIterationQuotaManager);
            }
            return this.handleFailure((Throwable)ex, singleIterationQuotaManager);
        })).thenCompose(ret -> ret), this.executor);
    }

    @Override
    public void close() throws CloseException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        CloseableUtils.closeAll(this.futureManager, this.transactionalRunner);
    }

    private CompletableFuture<RecordCursorResult<T>> iterateOneRange(FDBRecordStore.Builder userStoreBuilder, RecordCursorResult<T> cursorStartPoint, QuotaManager singleIterationQuotaManager) {
        AtomicReference cont = new AtomicReference();
        return this.transactionalRunner.runAsync(true, transaction -> {
            singleIterationQuotaManager.init();
            ThrottledRetryingIterator.runUnlessNull(this.transactionInitNotification, singleIterationQuotaManager);
            CompletableFuture storeFuture = userStoreBuilder.setContext((FDBRecordContext)transaction).openAsync();
            return storeFuture.thenCompose(store -> {
                RecordCursor cursor = this.cursorCreator.createCursor((FDBRecordStore)store, cursorStartPoint, this.cursorRowsLimit);
                this.rangeIterationStartTimeMilliseconds = this.nowMillis();
                return AsyncUtil.whileTrue(() -> {
                    CompletableFuture onNext = cursor.onNext();
                    return ((CompletableFuture)this.futureManager.registerFuture(onNext).thenCompose(result -> {
                        cont.set(result);
                        if (!result.hasNext()) {
                            if (result.getNoNextReason().isSourceExhausted()) {
                                singleIterationQuotaManager.hasMore = false;
                            }
                            return AsyncUtil.READY_FALSE;
                        }
                        ++singleIterationQuotaManager.scannedCount;
                        CompletableFuture<Void> future = this.singleItemHandler.handleOneItem((FDBRecordStore)store, (RecordCursorResult<T>)result, singleIterationQuotaManager);
                        return this.futureManager.registerFuture(future).thenApply(ignore -> singleIterationQuotaManager.hasMore);
                    })).thenApply(rangeHasMore -> {
                        if (rangeHasMore.booleanValue() && (0 < this.transactionTimeQuotaMillis && this.elapsedTimeMillis() > (long)this.transactionTimeQuotaMillis || 0 < this.maxRecordDeletesPerTransaction && singleIterationQuotaManager.deletesCount >= this.maxRecordDeletesPerTransaction)) {
                            return false;
                        }
                        return rangeHasMore;
                    });
                }, this.executor).whenComplete((r, e) -> cursor.close());
            });
        }).thenApply(ignore -> (RecordCursorResult)cont.get());
    }

    private CompletableFuture<Boolean> handleSuccess(QuotaManager quotaManager) {
        ThrottledRetryingIterator.runUnlessNull(this.transactionSuccessNotification, quotaManager);
        if (!quotaManager.hasMore) {
            return AsyncUtil.READY_FALSE;
        }
        ++this.successCounter;
        if (this.successCounter % 40 == 0 && this.cursorRowsLimit < quotaManager.scannedCount + 3) {
            int oldLimit = this.cursorRowsLimit;
            this.cursorRowsLimit = ThrottledRetryingIterator.increaseLimit(oldLimit);
            if (logger.isInfoEnabled() && oldLimit != this.cursorRowsLimit) {
                logger.info(KeyValueLogMessage.of("ThrottledIterator: iterate one range success: increase limit", new Object[]{LogMessageKeys.LIMIT, this.cursorRowsLimit, LogMessageKeys.OLD_LIMIT, oldLimit, LogMessageKeys.SUCCESSFUL_TRANSACTIONS_COUNT, this.successCounter}));
            }
        }
        this.failureRetriesCounter = 0;
        long rangeProcessingTimeMillis = Math.max(0L, this.elapsedTimeMillis());
        long toWaitMillis = Collections.max(List.of(Long.valueOf(ThrottledRetryingIterator.throttlePerSecGetDelayMillis(rangeProcessingTimeMillis, this.maxRecordDeletesPerSec, quotaManager.deletesCount)), Long.valueOf(ThrottledRetryingIterator.throttlePerSecGetDelayMillis(rangeProcessingTimeMillis, this.maxRecordScannedPerSec, quotaManager.scannedCount))));
        if (toWaitMillis > 0L) {
            CompletableFuture<Void> result = MoreAsyncUtil.delayedFuture(toWaitMillis, TimeUnit.MILLISECONDS, this.scheduledExecutor);
            return this.futureManager.registerFuture(result).thenApply(ignore -> true);
        }
        return AsyncUtil.READY_TRUE;
    }

    private CompletableFuture<Boolean> handleFailure(Throwable ex, QuotaManager quotaManager) {
        ++this.failureRetriesCounter;
        if (this.failureRetriesCounter > this.numOfRetries) {
            if (logger.isWarnEnabled()) {
                logger.warn(KeyValueLogMessage.of("ThrottledIterator: iterate one range failure: will abort", new Object[]{LogMessageKeys.LIMIT, this.cursorRowsLimit, LogMessageKeys.RETRY_COUNT, this.failureRetriesCounter}), ex);
            }
            return CompletableFuture.failedFuture(ex);
        }
        if (ex instanceof CompletionException) {
            ex = ex.getCause();
        }
        if (ex instanceof FDBDatabaseRunner.RunnerClosed) {
            if (logger.isWarnEnabled()) {
                logger.warn(KeyValueLogMessage.of("ThrottledIterator: runner closed: will abort", new Object[0]), ex);
            }
            return CompletableFuture.failedFuture(ex);
        }
        this.successCounter = 0;
        int oldLimit = this.cursorRowsLimit;
        this.cursorRowsLimit = ThrottledRetryingIterator.decreaseLimit(quotaManager.scannedCount);
        if (logger.isInfoEnabled() && oldLimit != this.cursorRowsLimit) {
            logger.info(KeyValueLogMessage.of("ThrottledIterator: iterate one range failure: will retry", new Object[]{LogMessageKeys.LIMIT, this.cursorRowsLimit, LogMessageKeys.OLD_LIMIT, oldLimit, LogMessageKeys.RETRY_COUNT, this.failureRetriesCounter}), ex);
        }
        return AsyncUtil.READY_TRUE;
    }

    @VisibleForTesting
    static long throttlePerSecGetDelayMillis(long rangeProcessingTimeMillis, int maxPerSec, int eventsCount) {
        if (maxPerSec <= 0) {
            return 0L;
        }
        long waitMillis = TimeUnit.SECONDS.toMillis(eventsCount) / (long)maxPerSec - rangeProcessingTimeMillis;
        return waitMillis > 0L ? waitMillis : 0L;
    }

    private long nowMillis() {
        return System.currentTimeMillis();
    }

    private long elapsedTimeMillis() {
        return this.rangeIterationStartTimeMilliseconds <= 0L ? 0L : this.nowMillis() - this.rangeIterationStartTimeMilliseconds;
    }

    private static void runUnlessNull(@Nullable Consumer<QuotaManager> func, QuotaManager quotaManager) {
        if (func != null) {
            func.accept(quotaManager);
        }
    }

    @VisibleForTesting
    static int increaseLimit(int current) {
        if (current == 0) {
            return 0;
        }
        return Math.max(current * 5 / 4, current + 4);
    }

    @VisibleForTesting
    static int decreaseLimit(int lastScanned) {
        return Math.max(1, lastScanned * 9 / 10);
    }

    public static <T> Builder<T> builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
        return new Builder<T>(runner, executor, scheduledExecutor, cursorCreator, singleItemHandler);
    }

    public static <T> Builder<T> builder(FDBDatabase database, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
        return new Builder<T>(database, FDBRecordContextConfig.newBuilder(), cursorCreator, singleItemHandler);
    }

    public static class Builder<T> {
        private final TransactionalRunner transactionalRunner;
        private final Executor executor;
        private final ScheduledExecutorService scheduledExecutor;
        private final CursorFactory<T> cursorCreator;
        private final ItemHandler<T> singleItemHandler;
        private Consumer<QuotaManager> transactionSuccessNotification;
        private Consumer<QuotaManager> transactionInitNotification;
        private int transactionTimeQuotaMillis;
        private int maxRecordDeletesPerTransaction;
        private int maxRecordScannedPerSec;
        private int maxRecordDeletesPerSec;
        private int numOfRetries;

        private Builder(TransactionalRunner runner, Executor executor, ScheduledExecutorService scheduledExecutor, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
            this.transactionalRunner = runner;
            this.executor = executor;
            this.scheduledExecutor = scheduledExecutor;
            this.cursorCreator = cursorCreator;
            this.singleItemHandler = singleItemHandler;
            this.transactionTimeQuotaMillis = (int)TimeUnit.SECONDS.toMillis(4L);
            this.maxRecordDeletesPerTransaction = 0;
            this.maxRecordScannedPerSec = 0;
            this.maxRecordDeletesPerSec = 0;
            this.numOfRetries = 100;
        }

        private Builder(FDBDatabase database, FDBRecordContextConfig.Builder contextConfigBuilder, CursorFactory<T> cursorCreator, ItemHandler<T> singleItemHandler) {
            this(new TransactionalRunner(database, contextConfigBuilder), database.newContextExecutor(contextConfigBuilder.getMdcContext()), database.getScheduledExecutor(), cursorCreator, singleItemHandler);
        }

        public Builder<T> withTransactionTimeQuotaMillis(int transactionTimeQuotaMillis) {
            this.transactionTimeQuotaMillis = Math.max(0, transactionTimeQuotaMillis);
            return this;
        }

        public Builder<T> withMaxRecordsScannedPerSec(int maxRecordsScannedPerSec) {
            this.maxRecordScannedPerSec = Math.max(0, maxRecordsScannedPerSec);
            return this;
        }

        public Builder<T> withMaxRecordsDeletesPerSec(int maxRecordsDeletesPerSec) {
            this.maxRecordDeletesPerSec = Math.max(0, maxRecordsDeletesPerSec);
            return this;
        }

        public Builder<T> withTransactionSuccessNotification(Consumer<QuotaManager> transactionSuccessNotification) {
            this.transactionSuccessNotification = transactionSuccessNotification;
            return this;
        }

        public Builder<T> withTransactionInitNotification(Consumer<QuotaManager> transactionInitNotification) {
            this.transactionInitNotification = transactionInitNotification;
            return this;
        }

        public Builder<T> withMaxRecordsDeletesPerTransaction(int maxRecordsDeletesPerTransaction) {
            this.maxRecordDeletesPerTransaction = Math.max(0, maxRecordsDeletesPerTransaction);
            return this;
        }

        public Builder<T> withNumOfRetries(int numOfRetries) {
            this.numOfRetries = Math.max(0, numOfRetries);
            return this;
        }

        public ThrottledRetryingIterator<T> build() {
            return new ThrottledRetryingIterator(this);
        }
    }

    public static class QuotaManager {
        int deletesCount;
        int scannedCount;
        boolean hasMore;

        public int getDeletesCount() {
            return this.deletesCount;
        }

        public int getScannedCount() {
            return this.scannedCount;
        }

        public void deleteCountAdd(int count) {
            this.deletesCount += count;
        }

        public void deleteCountInc() {
            ++this.deletesCount;
        }

        public void markExhausted() {
            this.hasMore = false;
        }

        void init() {
            this.deletesCount = 0;
            this.scannedCount = 0;
            this.hasMore = true;
        }
    }
}

