/*
 * Decompiled with CFR 0.152.
 */
package com.azure.search.documents.implementation.batching;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
import com.azure.search.documents.implementation.batching.IndexBatchResponse;
import com.azure.search.documents.implementation.batching.TryTrackingIndexAction;
import com.azure.search.documents.implementation.converters.IndexActionConverter;
import com.azure.search.documents.implementation.models.IndexAction;
import com.azure.search.documents.implementation.util.Utility;
import com.azure.search.documents.models.IndexBatchException;
import com.azure.search.documents.models.IndexDocumentsResult;
import com.azure.search.documents.models.IndexingResult;
import com.azure.search.documents.options.OnActionAddedOptions;
import com.azure.search.documents.options.OnActionErrorOptions;
import com.azure.search.documents.options.OnActionSentOptions;
import com.azure.search.documents.options.OnActionSucceededOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class SearchIndexingPublisher<T> {
    private static final double JITTER_FACTOR = 0.05;
    private static final String BATCH_SIZE_SCALED_DOWN = "Scaling down batch size due to 413 (Payload too large) response.{}Scaled down from {} to {}";
    private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingPublisher.class);
    private final SearchIndexClientImpl restClient;
    private final JsonSerializer serializer;
    private final boolean autoFlush;
    private int batchActionCount;
    private final int maxRetries;
    private final Duration throttlingDelay;
    private final Duration maxThrottlingDelay;
    private final Consumer<OnActionAddedOptions<T>> onActionAddedConsumer;
    private final Consumer<OnActionSentOptions<T>> onActionSentConsumer;
    private final Consumer<OnActionSucceededOptions<T>> onActionSucceededConsumer;
    private final Consumer<OnActionErrorOptions<T>> onActionErrorConsumer;
    private final Function<T, String> documentKeyRetriever;
    private final Function<Integer, Integer> scaleDownFunction = size -> size / 2;
    private final Deque<TryTrackingIndexAction<T>> actions = new ConcurrentLinkedDeque<TryTrackingIndexAction<T>>();
    private final Deque<TryTrackingIndexAction<T>> inFlightActions = new ConcurrentLinkedDeque<TryTrackingIndexAction<T>>();
    private final Semaphore actionsSemaphore = new Semaphore(1);
    private final Semaphore processingSemaphore = new Semaphore(1, true);
    volatile AtomicInteger backoffCount = new AtomicInteger();
    volatile Duration currentRetryDelay = Duration.ZERO;

    public SearchIndexingPublisher(SearchIndexClientImpl restClient, JsonSerializer serializer, Function<T, String> documentKeyRetriever, boolean autoFlush, int initialBatchActionCount, int maxRetriesPerAction, Duration throttlingDelay, Duration maxThrottlingDelay, Consumer<OnActionAddedOptions<T>> onActionAddedConsumer, Consumer<OnActionSucceededOptions<T>> onActionSucceededConsumer, Consumer<OnActionErrorOptions<T>> onActionErrorConsumer, Consumer<OnActionSentOptions<T>> onActionSentConsumer) {
        this.documentKeyRetriever = Objects.requireNonNull(documentKeyRetriever, "'documentKeyRetriever' cannot be null");
        this.restClient = restClient;
        this.serializer = serializer;
        this.autoFlush = autoFlush;
        this.batchActionCount = initialBatchActionCount;
        this.maxRetries = maxRetriesPerAction;
        this.throttlingDelay = throttlingDelay;
        this.maxThrottlingDelay = maxThrottlingDelay.compareTo(this.throttlingDelay) < 0 ? this.throttlingDelay : maxThrottlingDelay;
        this.onActionAddedConsumer = onActionAddedConsumer;
        this.onActionSentConsumer = onActionSentConsumer;
        this.onActionSucceededConsumer = onActionSucceededConsumer;
        this.onActionErrorConsumer = onActionErrorConsumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<com.azure.search.documents.models.IndexAction<T>> getActions() {
        this.acquireActionsSemaphore();
        try {
            ArrayList<com.azure.search.documents.models.IndexAction<T>> actions = new ArrayList<com.azure.search.documents.models.IndexAction<T>>();
            for (TryTrackingIndexAction<T> inFlightAction : this.inFlightActions) {
                actions.add(inFlightAction.getAction());
            }
            for (TryTrackingIndexAction<T> action : this.actions) {
                actions.add(action.getAction());
            }
            ArrayList<com.azure.search.documents.models.IndexAction<T>> arrayList = actions;
            return arrayList;
        }
        finally {
            this.actionsSemaphore.release();
        }
    }

    public int getBatchActionCount() {
        return this.batchActionCount;
    }

    public Duration getCurrentRetryDelay() {
        return this.currentRetryDelay;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Mono<Void> addActions(Collection<com.azure.search.documents.models.IndexAction<T>> actions, Context context, Runnable rescheduleFlush) {
        try {
            this.actionsSemaphore.acquire();
        }
        catch (InterruptedException ex) {
            return Mono.error((Throwable)ex);
        }
        try {
            actions.stream().map(action -> new TryTrackingIndexAction(action, this.documentKeyRetriever.apply(action.getDocument()))).forEach(action -> {
                if (this.onActionAddedConsumer != null) {
                    this.onActionAddedConsumer.accept(new OnActionAddedOptions(action.getAction()));
                }
                this.actions.add((TryTrackingIndexAction<T>)action);
            });
        }
        finally {
            this.actionsSemaphore.release();
        }
        LOGGER.verbose("Actions added, new pending queue size: {}.", new Object[]{this.actions.size()});
        if (this.autoFlush && this.batchAvailableForProcessing()) {
            rescheduleFlush.run();
            LOGGER.verbose("Adding documents triggered batch size limit, sending documents for indexing.");
            return this.flush(false, false, context);
        }
        return Mono.empty();
    }

    public Mono<Void> flush(boolean awaitLock, boolean isClose, Context context) {
        if (awaitLock) {
            try {
                this.processingSemaphore.acquire();
            }
            catch (InterruptedException ex) {
                return Mono.error((Throwable)ex);
            }
            return Mono.using(() -> this.processingSemaphore, ignored -> {
                try {
                    return this.flushLoop(isClose, context);
                }
                catch (RuntimeException ex) {
                    return Mono.error((Throwable)ex);
                }
            }, Semaphore::release);
        }
        if (this.processingSemaphore.tryAcquire()) {
            return Mono.using(() -> this.processingSemaphore, ignored -> {
                try {
                    return this.flushLoop(isClose, context);
                }
                catch (RuntimeException ex) {
                    return Mono.error((Throwable)ex);
                }
            }, Semaphore::release);
        }
        LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
        return Mono.empty();
    }

    private Mono<Void> flushLoop(boolean isClosed, Context context) {
        return this.createAndProcessBatch(context).expand(ignored -> Flux.defer(() -> this.batchAvailableForProcessing() || isClosed ? this.createAndProcessBatch(context) : Flux.empty())).then();
    }

    private Mono<IndexBatchResponse> createAndProcessBatch(Context context) {
        List<TryTrackingIndexAction<T>> batchActions = this.createBatch();
        if (CoreUtils.isNullOrEmpty(batchActions)) {
            return Mono.empty();
        }
        List<IndexAction> convertedActions = batchActions.stream().map(action -> IndexActionConverter.map(action.getAction(), (ObjectSerializer)this.serializer)).collect(Collectors.toList());
        return this.sendBatch(convertedActions, batchActions, context).map(response -> {
            this.handleResponse(batchActions, (IndexBatchResponse)response);
            return response;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TryTrackingIndexAction<T>> createBatch() {
        this.acquireActionsSemaphore();
        try {
            int actionSize = this.actions.size();
            int inFlightActionSize = this.inFlightActions.size();
            int size = Math.min(this.batchActionCount, actionSize + inFlightActionSize);
            ArrayList<TryTrackingIndexAction<T>> batchActions = new ArrayList<TryTrackingIndexAction<T>>(size);
            HashSet<String> keysInBatch = new HashSet<String>(size * 2);
            int inFlightDocumentsAdded = SearchIndexingPublisher.fillFromQueue(batchActions, this.inFlightActions, size, keysInBatch);
            if (inFlightDocumentsAdded == size) {
                this.reinsertFailedActions(this.inFlightActions, false);
            } else {
                SearchIndexingPublisher.fillFromQueue(batchActions, this.actions, size - inFlightDocumentsAdded, keysInBatch);
            }
            ArrayList<TryTrackingIndexAction<T>> arrayList = batchActions;
            return arrayList;
        }
        finally {
            this.actionsSemaphore.release();
        }
    }

    private static <T> int fillFromQueue(List<TryTrackingIndexAction<T>> batch, Deque<TryTrackingIndexAction<T>> queue, int requested, Set<String> duplicateKeyTracker) {
        int actionsAdded = 0;
        Iterator<TryTrackingIndexAction<T>> iterator = queue.iterator();
        while (actionsAdded < requested && iterator.hasNext()) {
            TryTrackingIndexAction<T> potentialDocumentToAdd = iterator.next();
            if (duplicateKeyTracker.contains(potentialDocumentToAdd.getKey())) continue;
            duplicateKeyTracker.add(potentialDocumentToAdd.getKey());
            batch.add(potentialDocumentToAdd);
            iterator.remove();
            ++actionsAdded;
        }
        return actionsAdded;
    }

    private Mono<IndexBatchResponse> sendBatch(List<IndexAction> actions, List<TryTrackingIndexAction<T>> batchActions, Context context) {
        LOGGER.verbose("Sending a batch of size {}.", new Object[]{batchActions.size()});
        if (this.onActionSentConsumer != null) {
            batchActions.forEach(action -> this.onActionSentConsumer.accept(new OnActionSentOptions(action.getAction())));
        }
        Mono batchCall = Utility.indexDocumentsWithResponseAsync(this.restClient, actions, true, context, LOGGER);
        Duration delay = this.currentRetryDelay;
        if (!delay.isZero() && !delay.isNegative()) {
            batchCall = batchCall.delaySubscription(delay);
        }
        return batchCall.map(response -> new IndexBatchResponse(response.getStatusCode(), ((IndexDocumentsResult)response.getValue()).getResults(), actions.size(), false)).doOnCancel(() -> {
            LOGGER.warning("Request was cancelled before response, adding all in-flight documents back to queue.");
            this.inFlightActions.addAll(batchActions);
        }).onErrorResume(IndexBatchException.class, exception -> Mono.just((Object)new IndexBatchResponse(207, exception.getIndexingResults(), actions.size(), true))).onErrorResume(HttpResponseException.class, exception -> {
            int statusCode = exception.getResponse().getStatusCode();
            if (statusCode == 413) {
                int previousBatchSize = Math.min(this.batchActionCount, actions.size());
                this.batchActionCount = Math.max(1, this.scaleDownFunction.apply(previousBatchSize));
                LOGGER.verbose(BATCH_SIZE_SCALED_DOWN, new Object[]{System.lineSeparator(), previousBatchSize, this.batchActionCount});
                int actionCount = actions.size();
                if (actionCount == 1) {
                    return Mono.just((Object)new IndexBatchResponse(statusCode, null, actionCount, true));
                }
                int splitOffset = Math.min(actions.size(), this.batchActionCount);
                List<TryTrackingIndexAction<T>> batchActionsToRemove = batchActions.subList(splitOffset, batchActions.size());
                this.reinsertFailedActions(batchActionsToRemove);
                batchActionsToRemove.clear();
                return this.sendBatch(actions.subList(0, splitOffset), batchActions, context);
            }
            return Mono.just((Object)new IndexBatchResponse(statusCode, null, actions.size(), true));
        }).onErrorResume(Exception.class, ignored -> Mono.just((Object)new IndexBatchResponse(0, null, actions.size(), true)));
    }

    private void handleResponse(List<TryTrackingIndexAction<T>> actions, IndexBatchResponse batchResponse) {
        boolean has503;
        if (batchResponse.getStatusCode() == 413 && batchResponse.getCount() == 1) {
            com.azure.search.documents.models.IndexAction<T> action = actions.get(0).getAction();
            if (this.onActionErrorConsumer != null) {
                this.onActionErrorConsumer.accept(new OnActionErrorOptions<T>(action).setThrowable(SearchIndexingPublisher.createDocumentTooLargeException()));
            }
            return;
        }
        LinkedList<TryTrackingIndexAction<T>> actionsToRetry = new LinkedList<TryTrackingIndexAction<T>>();
        boolean bl = has503 = batchResponse.getStatusCode() == 503;
        if (batchResponse.getResults() == null) {
            actionsToRetry.addAll(actions);
        } else {
            for (IndexingResult result : batchResponse.getResults()) {
                String key = result.getKey();
                TryTrackingIndexAction action = actions.stream().filter(a -> key.equals(a.getKey())).findFirst().orElse(null);
                if (action == null) {
                    LOGGER.warning("Unable to correlate result key {} to initial document.", new Object[]{key});
                    continue;
                }
                if (SearchIndexingPublisher.isSuccess(result.getStatusCode())) {
                    if (this.onActionSucceededConsumer == null) continue;
                    this.onActionSucceededConsumer.accept(new OnActionSucceededOptions(action.getAction()));
                    continue;
                }
                if (SearchIndexingPublisher.isRetryable(result.getStatusCode())) {
                    has503 |= result.getStatusCode() == 503;
                    if (action.getTryCount() < this.maxRetries) {
                        action.incrementTryCount();
                        actionsToRetry.add(action);
                        continue;
                    }
                    if (this.onActionErrorConsumer == null) continue;
                    this.onActionErrorConsumer.accept(new OnActionErrorOptions(action.getAction()).setThrowable(SearchIndexingPublisher.createDocumentHitRetryLimitException()).setIndexingResult(result));
                    continue;
                }
                if (this.onActionErrorConsumer == null) continue;
                this.onActionErrorConsumer.accept(new OnActionErrorOptions(action.getAction()).setIndexingResult(result));
            }
        }
        if (has503) {
            this.currentRetryDelay = this.calculateRetryDelay(this.backoffCount.getAndIncrement());
        } else {
            this.backoffCount.set(0);
            this.currentRetryDelay = Duration.ZERO;
        }
        if (!CoreUtils.isNullOrEmpty(actionsToRetry)) {
            this.reinsertFailedActions(actionsToRetry, true);
        }
    }

    private void reinsertFailedActions(Deque<TryTrackingIndexAction<T>> actionsToRetry, boolean acquireSemaphore) {
        if (acquireSemaphore) {
            this.acquireActionsSemaphore();
            try {
                actionsToRetry.descendingIterator().forEachRemaining(this.actions::add);
            }
            finally {
                this.actionsSemaphore.release();
            }
        } else {
            actionsToRetry.descendingIterator().forEachRemaining(this.actions::add);
        }
    }

    private void reinsertFailedActions(List<TryTrackingIndexAction<T>> actionsToRetry) {
        this.acquireActionsSemaphore();
        try {
            for (int i = actionsToRetry.size() - 1; i >= 0; --i) {
                this.actions.push(actionsToRetry.get(i));
            }
        }
        finally {
            this.actionsSemaphore.release();
        }
    }

    private void acquireActionsSemaphore() {
        try {
            this.actionsSemaphore.acquire();
        }
        catch (InterruptedException ex) {
            throw LOGGER.logExceptionAsError(new RuntimeException(ex));
        }
    }

    private boolean batchAvailableForProcessing() {
        return this.actions.size() + this.inFlightActions.size() >= this.batchActionCount;
    }

    private static boolean isSuccess(int statusCode) {
        return statusCode == 200 || statusCode == 201;
    }

    private static boolean isRetryable(int statusCode) {
        return statusCode == 409 || statusCode == 422 || statusCode == 503;
    }

    private Duration calculateRetryDelay(int backoffCount) {
        long delayWithJitterInNanos = ThreadLocalRandom.current().nextLong((long)((double)this.throttlingDelay.toNanos() * 0.95), (long)((double)this.throttlingDelay.toNanos() * 1.05));
        return Duration.ofNanos(Math.min((1L << backoffCount) * delayWithJitterInNanos, this.maxThrottlingDelay.toNanos()));
    }

    private static RuntimeException createDocumentTooLargeException() {
        return new RuntimeException("Document is too large to be indexed and won't be tried again.");
    }

    private static RuntimeException createDocumentHitRetryLimitException() {
        return new RuntimeException("Document has reached retry limit and won't be tried again.");
    }
}

