/*
 * Decompiled with CFR 0.152.
 */
package com.azure.search.documents.implementation.batching;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.search.documents.implementation.SearchIndexClientImpl;
import com.azure.search.documents.implementation.batching.IndexBatchResponse;
import com.azure.search.documents.implementation.batching.IndexingDocumentManager;
import com.azure.search.documents.implementation.batching.SearchBatchingUtils;
import com.azure.search.documents.implementation.batching.TryTrackingIndexAction;
import com.azure.search.documents.implementation.converters.IndexActionConverter;
import com.azure.search.documents.implementation.models.IndexAction;
import com.azure.search.documents.implementation.util.Utility;
import com.azure.search.documents.models.IndexBatchException;
import com.azure.search.documents.models.IndexDocumentsResult;
import com.azure.search.documents.models.IndexingResult;
import com.azure.search.documents.options.OnActionAddedOptions;
import com.azure.search.documents.options.OnActionErrorOptions;
import com.azure.search.documents.options.OnActionSentOptions;
import com.azure.search.documents.options.OnActionSucceededOptions;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

public final class SearchIndexingAsyncPublisher<T> {
    private static final ClientLogger LOGGER = new ClientLogger(SearchIndexingAsyncPublisher.class);
    private final SearchIndexClientImpl restClient;
    private final JsonSerializer serializer;
    private final boolean autoFlush;
    private int batchSize;
    private final int maxRetries;
    private final long throttlingDelayNanos;
    private final long maxThrottlingDelayNanos;
    private final Consumer<OnActionAddedOptions<T>> onActionAdded;
    private final Consumer<OnActionSentOptions<T>> onActionSent;
    private final Consumer<OnActionSucceededOptions<T>> onActionSucceeded;
    private final Consumer<OnActionErrorOptions<T>> onActionError;
    private final Function<T, String> documentKeyRetriever;
    private final Function<Integer, Integer> scaleDownFunction = size -> size / 2;
    private final IndexingDocumentManager<T> documentManager;
    private final Semaphore processingSemaphore = new Semaphore(1, true);
    volatile AtomicInteger backoffCount = new AtomicInteger();
    volatile Duration currentRetryDelay = Duration.ZERO;

    public SearchIndexingAsyncPublisher(SearchIndexClientImpl restClient, JsonSerializer serializer, Function<T, String> documentKeyRetriever, boolean autoFlush, int initialBatchActionCount, int maxRetriesPerAction, Duration throttlingDelay, Duration maxThrottlingDelay, Consumer<OnActionAddedOptions<T>> onActionAdded, Consumer<OnActionSucceededOptions<T>> onActionSucceeded, Consumer<OnActionErrorOptions<T>> onActionError, Consumer<OnActionSentOptions<T>> onActionSent) {
        this.documentKeyRetriever = Objects.requireNonNull(documentKeyRetriever, "'documentKeyRetriever' cannot be null");
        this.restClient = restClient;
        this.serializer = serializer;
        this.documentManager = new IndexingDocumentManager();
        this.autoFlush = autoFlush;
        this.batchSize = initialBatchActionCount;
        this.maxRetries = maxRetriesPerAction;
        this.throttlingDelayNanos = throttlingDelay.toNanos();
        this.maxThrottlingDelayNanos = maxThrottlingDelay.compareTo(throttlingDelay) < 0 ? this.throttlingDelayNanos : maxThrottlingDelay.toNanos();
        this.onActionAdded = onActionAdded;
        this.onActionSent = onActionSent;
        this.onActionSucceeded = onActionSucceeded;
        this.onActionError = onActionError;
    }

    public Collection<com.azure.search.documents.models.IndexAction<T>> getActions() {
        return this.documentManager.getActions();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Duration getCurrentRetryDelay() {
        return this.currentRetryDelay;
    }

    public Mono<Void> addActions(Collection<com.azure.search.documents.models.IndexAction<T>> actions, Context context, Runnable rescheduleFlush) {
        Tuple2<Integer, Boolean> batchSizeAndAvailable = this.documentManager.addAndCheckForBatch(actions, this.documentKeyRetriever, this.onActionAdded, this.batchSize);
        LOGGER.verbose("Actions added, new pending queue size: {}.", new Object[]{batchSizeAndAvailable.getT1()});
        if (this.autoFlush && ((Boolean)batchSizeAndAvailable.getT2()).booleanValue()) {
            rescheduleFlush.run();
            LOGGER.verbose("Adding documents triggered batch size limit, sending documents for indexing.");
            return this.flush(false, false, context);
        }
        return Mono.empty();
    }

    public Mono<Void> flush(boolean awaitLock, boolean isClose, Context context) {
        if (awaitLock) {
            try {
                this.processingSemaphore.acquire();
            }
            catch (InterruptedException e) {
                throw LOGGER.logExceptionAsError(new RuntimeException(e));
            }
            return Mono.using(() -> this.processingSemaphore, ignored -> this.flushLoop(isClose, context), Semaphore::release);
        }
        if (this.processingSemaphore.tryAcquire()) {
            return Mono.using(() -> this.processingSemaphore, ignored -> this.flushLoop(isClose, context), Semaphore::release);
        }
        LOGGER.verbose("Batch already in-flight and not waiting for completion. Performing no-op.");
        return Mono.empty();
    }

    private Mono<Void> flushLoop(boolean isClosed, Context context) {
        return this.createAndProcessBatch(context, true).expand(ignored -> Flux.defer(() -> this.createAndProcessBatch(context, isClosed))).then();
    }

    private Mono<IndexBatchResponse> createAndProcessBatch(Context context, boolean ignoreBatchSize) {
        List<TryTrackingIndexAction<T>> batchActions = this.documentManager.tryCreateBatch(this.batchSize, ignoreBatchSize);
        if (CoreUtils.isNullOrEmpty(batchActions)) {
            return Mono.empty();
        }
        List<IndexAction> convertedActions = batchActions.stream().map(action -> IndexActionConverter.map(action.getAction(), (ObjectSerializer)this.serializer)).collect(Collectors.toList());
        return this.sendBatch(convertedActions, batchActions, context).map(response -> {
            this.handleResponse(batchActions, (IndexBatchResponse)response);
            return response;
        });
    }

    private Mono<IndexBatchResponse> sendBatch(List<IndexAction> actions, List<TryTrackingIndexAction<T>> batchActions, Context context) {
        LOGGER.verbose("Sending a batch of size {}.", new Object[]{batchActions.size()});
        if (this.onActionSent != null) {
            batchActions.forEach(action -> this.onActionSent.accept(new OnActionSentOptions(action.getAction())));
        }
        Mono batchCall = Utility.indexDocumentsWithResponseAsync(this.restClient, actions, true, context, LOGGER);
        if (!this.currentRetryDelay.isZero() && !this.currentRetryDelay.isNegative()) {
            batchCall = batchCall.delaySubscription(this.currentRetryDelay);
        }
        return batchCall.map(response -> new IndexBatchResponse(response.getStatusCode(), ((IndexDocumentsResult)response.getValue()).getResults(), actions.size(), false)).doOnCancel(() -> {
            LOGGER.warning("Request was cancelled before response, adding all in-flight documents back to queue.");
            this.documentManager.reinsertCancelledActions(batchActions);
        }).onErrorResume(IndexBatchException.class, exception -> Mono.just((Object)new IndexBatchResponse(207, exception.getIndexingResults(), actions.size(), true))).onErrorResume(HttpResponseException.class, exception -> {
            int statusCode = exception.getResponse().getStatusCode();
            if (statusCode == 413) {
                int previousBatchSize = Math.min(this.batchSize, actions.size());
                this.batchSize = Math.max(1, this.scaleDownFunction.apply(previousBatchSize));
                LOGGER.verbose("Scaling down batch size due to 413 (Payload too large) response.{}Scaled down from {} to {}", new Object[]{System.lineSeparator(), previousBatchSize, this.batchSize});
                int actionCount = actions.size();
                if (actionCount == 1) {
                    return Mono.just((Object)new IndexBatchResponse(statusCode, null, actionCount, true));
                }
                int splitOffset = Math.min(actions.size(), this.batchSize);
                List batchActionsToRemove = batchActions.subList(splitOffset, batchActions.size());
                this.documentManager.reinsertFailedActions(batchActionsToRemove);
                batchActionsToRemove.clear();
                return this.sendBatch(actions.subList(0, splitOffset), batchActions, context);
            }
            return Mono.just((Object)new IndexBatchResponse(statusCode, null, actions.size(), true));
        }).onErrorResume(Exception.class, ignored -> Mono.just((Object)new IndexBatchResponse(0, null, actions.size(), true)));
    }

    private void handleResponse(List<TryTrackingIndexAction<T>> actions, IndexBatchResponse batchResponse) {
        boolean has503;
        if (batchResponse.getStatusCode() == 413 && batchResponse.getCount() == 1) {
            com.azure.search.documents.models.IndexAction<T> action = actions.get(0).getAction();
            if (this.onActionError != null) {
                this.onActionError.accept(new OnActionErrorOptions<T>(action).setThrowable(SearchBatchingUtils.createDocumentTooLargeException()));
            }
            return;
        }
        ArrayList actionsToRetry = new ArrayList();
        boolean bl = has503 = batchResponse.getStatusCode() == 503;
        if (batchResponse.getResults() == null) {
            actionsToRetry.addAll(actions);
        } else {
            for (IndexingResult result : batchResponse.getResults()) {
                String key = result.getKey();
                TryTrackingIndexAction action = actions.stream().filter(a -> key.equals(a.getKey())).findFirst().orElse(null);
                if (action == null) {
                    LOGGER.warning("Unable to correlate result key {} to initial document.", new Object[]{key});
                    continue;
                }
                if (SearchBatchingUtils.isSuccess(result.getStatusCode())) {
                    if (this.onActionSucceeded == null) continue;
                    this.onActionSucceeded.accept(new OnActionSucceededOptions(action.getAction()));
                    continue;
                }
                if (SearchBatchingUtils.isRetryable(result.getStatusCode())) {
                    has503 |= result.getStatusCode() == 503;
                    if (action.getTryCount() < this.maxRetries) {
                        action.incrementTryCount();
                        actionsToRetry.add(action);
                        continue;
                    }
                    if (this.onActionError == null) continue;
                    this.onActionError.accept(new OnActionErrorOptions(action.getAction()).setThrowable(SearchBatchingUtils.createDocumentHitRetryLimitException()).setIndexingResult(result));
                    continue;
                }
                if (this.onActionError == null) continue;
                this.onActionError.accept(new OnActionErrorOptions(action.getAction()).setIndexingResult(result));
            }
        }
        if (has503) {
            this.currentRetryDelay = SearchBatchingUtils.calculateRetryDelay(this.backoffCount.getAndIncrement(), this.throttlingDelayNanos, this.maxThrottlingDelayNanos);
        } else {
            this.backoffCount.set(0);
            this.currentRetryDelay = Duration.ZERO;
        }
        if (!CoreUtils.isNullOrEmpty(actionsToRetry)) {
            this.documentManager.reinsertFailedActions(actionsToRetry);
        }
    }
}

