/*
 * Decompiled with CFR 0.152.
 */
package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.FDBException;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.IndexState;
import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.logging.KeyValueLogMessage;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.provider.common.StoreTimer;
import com.apple.foundationdb.record.provider.common.StoreTimerSnapshot;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore;
import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer;
import com.apple.foundationdb.record.provider.foundationdb.IndexingBase;
import com.apple.foundationdb.record.provider.foundationdb.IndexingCommon;
import com.apple.foundationdb.record.provider.foundationdb.SubspaceProvider;
import com.apple.foundationdb.record.provider.foundationdb.runners.ExponentialDelay;
import com.apple.foundationdb.record.util.Result;
import com.apple.foundationdb.util.LoggableException;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@API(value=API.Status.INTERNAL)
public class IndexingThrottle {
    @Nonnull
    private static final Logger LOGGER = LoggerFactory.getLogger(IndexingThrottle.class);
    @Nonnull
    private final IndexingCommon common;
    @Nonnull
    private final Booker booker;
    private final boolean isScrubber;
    private Set<Index> mergeRequiredIndexes = new HashSet<Index>();

    IndexingThrottle(@Nonnull IndexingCommon common, boolean isScrubber) {
        this.common = common;
        this.isScrubber = isScrubber;
        this.booker = new Booker(common);
    }

    public long waitTimeMilliseconds() {
        return this.booker.waitTimeMilliseconds();
    }

    public List<Object> logMessageKeyValues() {
        return this.booker.logMessageKeyValues();
    }

    private synchronized void loadConfig() {
        if (this.common.loadConfig()) {
            this.booker.refreshConfigLimits();
        }
    }

    @Nullable
    @VisibleForTesting
    static FDBException getFDBException(@Nullable Throwable e) {
        return IndexingBase.findException(e, FDBException.class);
    }

    @Nonnull
    public <R> CompletableFuture<R> buildCommitRetryAsync(@Nonnull BiFunction<FDBRecordStore, AtomicLong, CompletableFuture<R>> buildFunction, @Nullable Function<FDBException, Optional<R>> shouldReturnQuietly, @Nullable List<Object> additionalLogMessageKeyValues, boolean adjustLimits) {
        ArrayList<Object> onlineIndexerLogMessageKeyValues = new ArrayList<Object>(this.common.indexLogMessageKeyValues());
        if (additionalLogMessageKeyValues != null) {
            onlineIndexerLogMessageKeyValues.addAll(additionalLogMessageKeyValues);
        }
        AtomicInteger tries = new AtomicInteger(0);
        AtomicLong recordsScanned = new AtomicLong(0L);
        CompletableFuture ret = new CompletableFuture();
        this.booker.resetStoreTimerSnapshot();
        ExponentialDelay delay = this.common.getRunner().createExponentialDelay();
        AsyncUtil.whileTrue(() -> {
            this.loadConfig();
            return ((CompletableFuture)this.common.getRunner().runAsync(context -> this.common.getRecordStoreBuilder().copyBuilder().setContext((FDBRecordContext)context).openAsync().thenCompose(store -> {
                this.expectedIndexStatesOrThrow((FDBRecordStore)store, (FDBRecordContext)context);
                return ((CompletableFuture)buildFunction.apply((FDBRecordStore)store, recordsScanned)).thenApply(retVal -> {
                    Set<Index> indexSet = store.getIndexDeferredMaintenanceControl().getMergeRequiredIndexes();
                    if (indexSet != null) {
                        this.mergeRequiredIndexes.addAll(indexSet);
                    }
                    return retVal;
                });
            }), (result, exception) -> {
                this.booker.handleLimitsPostRunnerTransaction((Throwable)exception, recordsScanned, adjustLimits, additionalLogMessageKeyValues);
                return Result.of(result, exception);
            }, onlineIndexerLogMessageKeyValues).handle((value, e) -> {
                Optional retVal;
                if (e == null) {
                    this.common.getTotalRecordsScanned().addAndGet(recordsScanned.get());
                    ret.complete(value);
                    return AsyncUtil.READY_FALSE;
                }
                FDBException fdbE = IndexingThrottle.getFDBException(e);
                if (shouldReturnQuietly != null && (retVal = (Optional)shouldReturnQuietly.apply(fdbE)).isPresent()) {
                    ret.complete(retVal.get());
                    return AsyncUtil.READY_FALSE;
                }
                int currTries = tries.getAndIncrement();
                boolean mayRetry = this.booker.mayRetryAfterHandlingException(fdbE, additionalLogMessageKeyValues, currTries, adjustLimits);
                if (!mayRetry) {
                    return this.completeExceptionally(ret, (Throwable)e, onlineIndexerLogMessageKeyValues);
                }
                if (LOGGER.isWarnEnabled()) {
                    KeyValueLogMessage message = KeyValueLogMessage.build("Retrying Runner Exception", new Object[]{LogMessageKeys.INDEXER_CURR_RETRY, currTries, LogMessageKeys.INDEXER_MAX_RETRIES, this.common.config.getMaxRetries(), LogMessageKeys.DELAY, delay.getNextDelayMillis()}).addKeysAndValues(onlineIndexerLogMessageKeyValues).addKeysAndValues(this.logMessageKeyValues());
                    this.booker.addStoreTimerAtFailureAndReset(message);
                    LOGGER.warn(message.toString(), (Throwable)e);
                }
                CompletableFuture delayedContinue = delay.delay().thenApply(ignore -> true);
                if (this.common.getRunner().getTimer() != null) {
                    delayedContinue = this.common.getRunner().getTimer().instrument(FDBStoreTimer.Events.RETRY_DELAY, delayedContinue, this.common.getRunner().getExecutor());
                }
                return delayedContinue;
            })).thenCompose(Function.identity());
        }, this.common.getRunner().getExecutor()).whenComplete((ignore, e) -> {
            if (e != null) {
                this.completeExceptionally(ret, (Throwable)e, (List<Object>)onlineIndexerLogMessageKeyValues);
            }
        });
        return ret;
    }

    private void expectedIndexStatesOrThrow(FDBRecordStore store, FDBRecordContext context) {
        List indexStates = this.common.getTargetIndexes().stream().map(store::getIndexState).collect(Collectors.toList());
        if (this.isScrubber) {
            if (indexStates.stream().allMatch(IndexState::isScannable)) {
                return;
            }
            throw new IndexingBase.UnexpectedReadableException(false, "Attempt to scrub a non readable index", new Object[]{LogMessageKeys.INDEX_NAME, this.common.getTargetIndexesNames(), LogMessageKeys.INDEX_STATE, indexStates});
        }
        if (indexStates.stream().allMatch(IndexState::isWriteOnly)) {
            return;
        }
        if (indexStates.stream().allMatch(IndexState::isScannable)) {
            throw new IndexingBase.UnexpectedReadableException(true, "All indexes are built", new Object[0]);
        }
        if (indexStates.stream().allMatch(state -> state.isWriteOnly() || state.isScannable())) {
            throw new IndexingBase.UnexpectedReadableException(false, "Some indexes are built", new Object[0]);
        }
        SubspaceProvider subspaceProvider = this.common.getRecordStoreBuilder().getSubspaceProvider();
        throw new RecordCoreStorageException("Unexpected index state(s)", new Object[]{subspaceProvider == null ? "nullSubspaceProvider" : subspaceProvider.logKey(), subspaceProvider == null ? "" : subspaceProvider.toString(context), LogMessageKeys.INDEX_NAME, this.common.getTargetIndexesNames(), LogMessageKeys.INDEX_STATE, indexStates});
    }

    private <R> CompletableFuture<Boolean> completeExceptionally(CompletableFuture<R> ret, Throwable e, List<Object> additionalLogMessageKeyValues) {
        if (e instanceof LoggableException) {
            ((LoggableException)e).addLogInfo(additionalLogMessageKeyValues.toArray());
        }
        ret.completeExceptionally(this.common.getRunner().getDatabase().mapAsyncToSyncException(e));
        return AsyncUtil.READY_FALSE;
    }

    public int getLimit() {
        return (int)this.booker.getRecordsLimit();
    }

    public long getTotalRecordsScannedSuccessfully() {
        return this.booker.totalRecordsScannedSuccess;
    }

    public synchronized Set<Index> getAndResetMergeRequiredIndexes() {
        Set<Index> indexSet = this.mergeRequiredIndexes;
        this.mergeRequiredIndexes = new HashSet<Index>();
        return indexSet;
    }

    static class Booker {
        @Nonnull
        private final IndexingCommon common;
        private long recordsLimit;
        private long lastFailureRecordsScanned;
        private long totalRecordsScannedSuccess = 0L;
        private long totalRecordsScannedFailure = 0L;
        private long countSuccessfulTransactions = 0L;
        private long countFailedTransactions = 0L;
        private long countRunnerFailedTransactions = 0L;
        private int consecutiveSuccessCount = 0;
        private long forcedDelayTimestampMilliSeconds = 0L;
        private long recordsScannedSinceForcedDelayMilliSeconds = 0L;
        private long consecutiveFailureCount = 0L;
        private StoreTimerSnapshot storeTimerSnapshot = null;

        Booker(@Nonnull IndexingCommon common) {
            this.common = common;
            this.recordsLimit = common.config.getInitialLimit();
        }

        long getRecordsLimit() {
            return this.recordsLimit;
        }

        long waitTimeMilliseconds() {
            int recordsPerSecond = this.common.config.getRecordsPerSecond();
            if (recordsPerSecond == Integer.MAX_VALUE) {
                this.recordsScannedSinceForcedDelayMilliSeconds = 0L;
                this.forcedDelayTimestampMilliSeconds = 0L;
                return 0L;
            }
            long now = System.currentTimeMillis();
            long delta = Math.max(0L, now - this.forcedDelayTimestampMilliSeconds);
            long toWait = Math.min(999L, Math.max(0L, 1000L * this.recordsScannedSinceForcedDelayMilliSeconds / (long)recordsPerSecond - delta));
            this.forcedDelayTimestampMilliSeconds = now + toWait;
            this.recordsScannedSinceForcedDelayMilliSeconds = 0L;
            return toWait;
        }

        public List<Object> logMessageKeyValues() {
            return Arrays.asList(new Object[]{LogMessageKeys.LIMIT, this.recordsLimit, LogMessageKeys.RECORDS_PER_SECOND, this.common.config.getRecordsPerSecond(), LogMessageKeys.SUCCESSFUL_TRANSACTIONS_COUNT, this.countSuccessfulTransactions, LogMessageKeys.FAILED_TRANSACTIONS_COUNT, this.countFailedTransactions, LogMessageKeys.FAILED_TRANSACTIONS_COUNT_IN_RUNNER, this.countRunnerFailedTransactions, LogMessageKeys.TOTAL_RECORDS_SCANNED, this.totalRecordsScannedSuccess, LogMessageKeys.TOTAL_RECORDS_SCANNED_DURING_FAILURES, this.totalRecordsScannedFailure});
        }

        boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException, @Nullable List<Object> additionalLogMessageKeyValues, int currTries, boolean adjustLimits) {
            if (currTries >= this.common.config.getMaxRetries() || !IndexingBase.shouldLessenWork(fdbException)) {
                return false;
            }
            if (adjustLimits) {
                this.decreaseLimit(fdbException, additionalLogMessageKeyValues);
            }
            return true;
        }

        void decreaseLimit(@Nonnull FDBException fdbException, @Nullable List<Object> additionalLogMessageKeyValues) {
            ++this.countFailedTransactions;
            ++this.consecutiveFailureCount;
            long oldLimit = this.recordsLimit;
            this.recordsLimit = Math.max(1L, Math.min(this.lastFailureRecordsScanned - 1L, this.lastFailureRecordsScanned * Booker.oneToNineFactor(this.consecutiveFailureCount) / 10L));
            if (LOGGER.isInfoEnabled()) {
                KeyValueLogMessage message = KeyValueLogMessage.build("Lessening limit of online index build", new Object[]{LogMessageKeys.ERROR, fdbException.getMessage(), LogMessageKeys.ERROR_CODE, fdbException.getCode(), LogMessageKeys.OLD_LIMIT, oldLimit}).addKeysAndValues(this.logMessageKeyValues()).addKeysAndValues(this.common.indexLogMessageKeyValues());
                if (additionalLogMessageKeyValues != null) {
                    message.addKeysAndValues(additionalLogMessageKeyValues);
                }
                this.addStoreTimerAtFailureAndReset(message);
                LOGGER.info(message.toString(), fdbException);
            }
        }

        private static long oneToNineFactor(long count) {
            if (count > 7L) {
                return 1L;
            }
            if (count > 3L) {
                return 5L;
            }
            return 10L - Math.max(1L, count);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void handleLimitsPostRunnerTransaction(@Nullable Throwable exception, @Nonnull AtomicLong recordsScanned, boolean adjustLimits, @Nullable List<Object> additionalLogMessageKeyValues) {
            long recordsScannedThisTransaction = recordsScanned.get();
            if (!adjustLimits) {
                if (exception == null) {
                    Booker booker = this;
                    synchronized (booker) {
                        this.totalRecordsScannedSuccess += recordsScannedThisTransaction;
                    }
                }
                return;
            }
            if (exception == null) {
                ++this.countSuccessfulTransactions;
                this.totalRecordsScannedSuccess += recordsScannedThisTransaction;
                this.recordsScannedSinceForcedDelayMilliSeconds += recordsScannedThisTransaction;
                if (this.consecutiveSuccessCount >= this.common.config.getIncreaseLimitAfter()) {
                    this.increaseLimit(additionalLogMessageKeyValues != null ? additionalLogMessageKeyValues : new ArrayList());
                    this.consecutiveSuccessCount = 0;
                } else {
                    ++this.consecutiveSuccessCount;
                }
                this.consecutiveFailureCount = 0L;
                this.resetStoreTimerSnapshot();
            } else {
                ++this.countRunnerFailedTransactions;
                this.lastFailureRecordsScanned = recordsScannedThisTransaction;
                this.totalRecordsScannedFailure += recordsScannedThisTransaction;
                recordsScanned.set(0L);
            }
        }

        private void increaseLimit(@Nonnull List<Object> additionalLogMessageKeyValues) {
            long maxLimit = this.common.config.getMaxLimit();
            if (this.recordsLimit >= maxLimit) {
                return;
            }
            long oldLimit = this.recordsLimit;
            this.recordsLimit = Math.min(maxLimit, Math.max(this.recordsLimit + 1L, this.getIncreasedLimit(oldLimit)));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(KeyValueLogMessage.build("Re-increasing limit of online index build", new Object[]{LogMessageKeys.OLD_LIMIT, oldLimit}).addKeysAndValues(additionalLogMessageKeyValues).addKeysAndValues(this.logMessageKeyValues()).addKeysAndValues(this.common.indexLogMessageKeyValues()).toString());
            }
        }

        private long getIncreasedLimit(long oldLimit) {
            if (oldLimit < 5L) {
                return oldLimit + 5L;
            }
            if (oldLimit < 100L) {
                return oldLimit * 2L;
            }
            return 4L * oldLimit / 3L;
        }

        void refreshConfigLimits() {
            long maxLimit = this.common.config.getMaxLimit();
            if (this.recordsLimit > maxLimit) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(KeyValueLogMessage.build("Decreasing the limit to the new max limit.", new Object[]{LogMessageKeys.OLD_LIMIT, this.recordsLimit, LogMessageKeys.LIMIT, maxLimit, LogMessageKeys.MAX_LIMIT, maxLimit}).addKeysAndValues(this.logMessageKeyValues()).addKeysAndValues(this.common.indexLogMessageKeyValues()).toString());
                }
                this.recordsLimit = maxLimit;
            }
        }

        private void addStoreTimerAtFailureAndReset(KeyValueLogMessage message) {
            FDBStoreTimer timer = this.common.getRunner().getTimer();
            if (timer != null) {
                FDBStoreTimer metricsDiff = this.storeTimerSnapshot == null ? timer : StoreTimer.getDifference(timer, this.storeTimerSnapshot);
                this.storeTimerSnapshot = StoreTimerSnapshot.from(timer);
                message.addKeysAndValues(metricsDiff.getKeysAndValues());
            }
        }

        private void resetStoreTimerSnapshot() {
            FDBStoreTimer timer = this.common.getRunner().getTimer();
            if (timer != null) {
                this.storeTimerSnapshot = StoreTimerSnapshot.from(timer);
            }
        }
    }
}

