/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.batch;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.BulkProcessingOptions;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosBulkItemResponse;
import com.azure.cosmos.CosmosBulkOperationResponse;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosItemOperation;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.TransactionalBatchOperationResult;
import com.azure.cosmos.TransactionalBatchResponse;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.batch.BulkExecutorUtil;
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.ServerOperationBatchRequest;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;

public final class BulkExecutor<TContext> {
    private static final Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
    private final CosmosAsyncContainer container;
    private final AsyncDocumentClient docClientWrapper;
    private final ThrottlingRetryOptions throttlingRetryOptions;
    private final Flux<CosmosItemOperation> inputOperations;
    private final int maxMicroBatchSize;
    private final int maxMicroBatchConcurrency;
    private final Duration maxMicroBatchInterval;
    private final TContext batchContext;
    private final AtomicBoolean mainSourceCompleted;
    private final AtomicInteger totalCount;
    private final FluxProcessor<CosmosItemOperation, CosmosItemOperation> mainFluxProcessor;
    private final FluxSink<CosmosItemOperation> mainSink;
    private final List<FluxSink<CosmosItemOperation>> groupSinks;

    public BulkExecutor(CosmosAsyncContainer container, Flux<CosmosItemOperation> inputOperations, BulkProcessingOptions<TContext> bulkOptions) {
        Preconditions.checkNotNull(container, "expected non-null container");
        Preconditions.checkNotNull(inputOperations, "expected non-null inputOperations");
        Preconditions.checkNotNull(bulkOptions, "expected non-null bulkOptions");
        this.container = container;
        this.inputOperations = inputOperations;
        this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase());
        this.throttlingRetryOptions = this.docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
        this.maxMicroBatchSize = bulkOptions.getMaxMicroBatchSize();
        this.maxMicroBatchConcurrency = bulkOptions.getMaxMicroBatchConcurrency();
        this.maxMicroBatchInterval = bulkOptions.getMaxMicroBatchInterval();
        this.batchContext = bulkOptions.getBatchContext();
        this.mainSourceCompleted = new AtomicBoolean(false);
        this.totalCount = new AtomicInteger(0);
        this.mainFluxProcessor = UnicastProcessor.create().serialize();
        this.mainSink = this.mainFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks = new ArrayList<FluxSink<CosmosItemOperation>>();
    }

    public Flux<CosmosBulkOperationResponse<TContext>> execute() {
        Flux responseFlux = this.inputOperations.onErrorContinue((throwable, o) -> logger.error("Skipping an error operation while processing {}. Cause: {}", o, (Object)throwable.getMessage())).doOnNext(cosmosItemOperation -> {
            BulkExecutorUtil.setRetryPolicyForBulk(this.docClientWrapper, this.container, cosmosItemOperation, this.throttlingRetryOptions);
            this.totalCount.incrementAndGet();
        }).doOnComplete(() -> {
            this.mainSourceCompleted.set(true);
            if (this.totalCount.get() == 0) {
                this.completeAllSinks();
            }
        }).mergeWith(this.mainFluxProcessor).flatMap(operation -> BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, operation).map(pkRangeId -> Pair.of(pkRangeId, operation))).groupBy(Pair::getKey, Pair::getValue).flatMap(this::executePartitionedGroup).doOnNext(requestAndResponse -> {
            if (this.totalCount.decrementAndGet() == 0 && this.mainSourceCompleted.get()) {
                this.completeAllSinks();
            }
        });
        return responseFlux;
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(GroupedFlux<String, CosmosItemOperation> partitionedGroupFluxOfInputOperations) {
        String pkRange = (String)partitionedGroupFluxOfInputOperations.key();
        FluxProcessor groupFluxProcessor = UnicastProcessor.create().serialize();
        FluxSink groupSink = groupFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks.add((FluxSink<CosmosItemOperation>)groupSink);
        return partitionedGroupFluxOfInputOperations.mergeWith((Publisher)groupFluxProcessor).bufferTimeout(this.maxMicroBatchSize, this.maxMicroBatchInterval).onBackpressureBuffer().flatMap(cosmosItemOperations -> this.executeOperations((List<CosmosItemOperation>)cosmosItemOperations, pkRange, (FluxSink<CosmosItemOperation>)groupSink), this.maxMicroBatchConcurrency);
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(List<CosmosItemOperation> operations, String pkRange, FluxSink<CosmosItemOperation> groupSink) {
        ServerOperationBatchRequest serverOperationBatchRequest = BulkExecutorUtil.createBatchRequest(operations, pkRange);
        if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
            serverOperationBatchRequest.getBatchPendingOperations().forEach(arg_0 -> groupSink.next(arg_0));
        }
        return Flux.just((Object)serverOperationBatchRequest.getBatchRequest()).publishOn(Schedulers.boundedElastic()).flatMap(serverRequest -> this.executePartitionKeyRangeServerBatchRequest((PartitionKeyRangeServerBatchRequest)serverRequest, groupSink));
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest, FluxSink<CosmosItemOperation> groupSink) {
        return this.executeBatchRequest(serverRequest).flatMapMany(response -> Flux.fromIterable(response.getResults()).flatMap(result -> this.handleTransactionalBatchOperationResult((TransactionalBatchResponse)response, (TransactionalBatchOperationResult)result, groupSink))).onErrorResume(throwable -> {
            if (!(throwable instanceof Exception)) {
                throw Exceptions.propagate((Throwable)throwable);
            }
            Exception exception = (Exception)throwable;
            return Flux.fromIterable(serverRequest.getOperations()).flatMap(itemOperation -> this.handleTransactionalBatchExecutionException((CosmosItemOperation)itemOperation, exception, groupSink));
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(TransactionalBatchResponse response, TransactionalBatchOperationResult operationResult, FluxSink<CosmosItemOperation> groupSink) {
        CosmosBulkItemResponse cosmosBulkItemResponse = BridgeInternal.createCosmosBulkItemResponse(operationResult, response);
        CosmosItemOperation itemOperation = operationResult.getOperation();
        if (!operationResult.isSuccessStatusCode()) {
            if (itemOperation instanceof ItemBulkOperation) {
                return ((ItemBulkOperation)itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap(result -> {
                    if (result.shouldRetry) {
                        groupSink.next((Object)itemOperation);
                        return Mono.empty();
                    }
                    return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, this.batchContext));
                });
            }
            throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
        }
        return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, this.batchContext));
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink) {
        if (exception instanceof CosmosException && itemOperation instanceof ItemBulkOperation) {
            CosmosException cosmosException = (CosmosException)((Object)exception);
            ItemBulkOperation itemBulkOperation = (ItemBulkOperation)itemOperation;
            if (cosmosException.getStatusCode() == HttpResponseStatus.GONE.code() && itemBulkOperation.getRetryPolicy().shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode())) {
                this.mainSink.next((Object)itemOperation);
                return Mono.empty();
            }
            return itemBulkOperation.getRetryPolicy().shouldRetry((Exception)((Object)cosmosException)).flatMap(result -> {
                if (result.shouldRetry) {
                    groupSink.next((Object)itemOperation);
                    return Mono.empty();
                }
                return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
            });
        }
        return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
    }

    private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {
        return this.docClientWrapper.executeBatchRequest(BridgeInternal.getLink(this.container), serverRequest, null, false);
    }

    private void completeAllSinks() {
        logger.info("Closing all sinks");
        this.mainSink.complete();
        this.groupSinks.forEach(FluxSink::complete);
    }
}

