/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.batch;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.BulkProcessingOptions;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.CosmosBulkItemResponse;
import com.azure.cosmos.CosmosBulkOperationResponse;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.CosmosItemOperation;
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.TransactionalBatchOperationResult;
import com.azure.cosmos.TransactionalBatchResponse;
import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.CosmosDaemonThreadFactory;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RequestOptions;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.batch.BulkExecutorUtil;
import com.azure.cosmos.implementation.batch.FlushBuffersItemOperation;
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
import com.azure.cosmos.implementation.batch.PartitionKeyRangeServerBatchRequest;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.batch.ServerOperationBatchRequest;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

public final class BulkExecutor<TContext> {
    private static final Logger logger = LoggerFactory.getLogger(BulkExecutor.class);
    private static final AtomicLong instanceCount = new AtomicLong(0L);
    private final CosmosAsyncContainer container;
    private final AsyncDocumentClient docClientWrapper;
    private final String operationContextText;
    private final OperationContextAndListenerTuple operationListener;
    private final ThrottlingRetryOptions throttlingRetryOptions;
    private final Flux<CosmosItemOperation> inputOperations;
    private final Long maxMicroBatchIntervalInMs;
    private final TContext batchContext;
    private final ConcurrentMap<String, PartitionScopeThresholds<TContext>> partitionScopeThresholds;
    private final BulkProcessingOptions<TContext> bulkOptions;
    private final AtomicBoolean mainSourceCompleted;
    private final AtomicInteger totalCount;
    private final FluxProcessor<CosmosItemOperation, CosmosItemOperation> mainFluxProcessor;
    private final FluxSink<CosmosItemOperation> mainSink;
    private final List<FluxSink<CosmosItemOperation>> groupSinks;
    private final ScheduledExecutorService executorService;

    public BulkExecutor(CosmosAsyncContainer container, Flux<CosmosItemOperation> inputOperations, BulkProcessingOptions<TContext> bulkOptions) {
        Preconditions.checkNotNull(container, "expected non-null container");
        Preconditions.checkNotNull(inputOperations, "expected non-null inputOperations");
        Preconditions.checkNotNull(bulkOptions, "expected non-null bulkOptions");
        this.bulkOptions = bulkOptions;
        this.container = container;
        this.inputOperations = inputOperations;
        this.docClientWrapper = CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase());
        this.throttlingRetryOptions = this.docClientWrapper.getConnectionPolicy().getThrottlingRetryOptions();
        this.maxMicroBatchIntervalInMs = bulkOptions.getMaxMicroBatchInterval().toMillis();
        this.batchContext = bulkOptions.getBatchContext();
        this.partitionScopeThresholds = ImplementationBridgeHelpers.BulkProcessingThresholdsHelper.getBulkProcessingThresholdsAccessor().getPartitionScopeThresholds(bulkOptions.getThresholds());
        this.operationListener = ImplementationBridgeHelpers.CosmosBulkProcessingOptionsHelper.getCosmosBulkProcessingOptionAccessor().getOperationContext(bulkOptions);
        this.operationContextText = this.operationListener != null && this.operationListener.getOperationContext() != null ? this.operationListener.getOperationContext().toString() : "n/a";
        this.mainSourceCompleted = new AtomicBoolean(false);
        this.totalCount = new AtomicInteger(0);
        this.mainFluxProcessor = UnicastProcessor.create().serialize();
        this.mainSink = this.mainFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks = new ArrayList<FluxSink<CosmosItemOperation>>();
        this.executorService = Executors.newSingleThreadScheduledExecutor(new CosmosDaemonThreadFactory("BulkExecutor-" + instanceCount.incrementAndGet()));
        this.executorService.scheduleWithFixedDelay(this::onFlush, this.maxMicroBatchIntervalInMs, this.maxMicroBatchIntervalInMs, TimeUnit.MILLISECONDS);
    }

    public Flux<CosmosBulkOperationResponse<TContext>> execute() {
        return this.inputOperations.onErrorContinue((throwable, o) -> logger.error("Skipping an error operation while processing {}. Cause: {}, Context: {}", new Object[]{o, throwable.getMessage(), this.operationContextText})).doOnNext(cosmosItemOperation -> {
            BulkExecutorUtil.setRetryPolicyForBulk(this.docClientWrapper, this.container, cosmosItemOperation, this.throttlingRetryOptions);
            if (cosmosItemOperation != FlushBuffersItemOperation.singleton()) {
                this.totalCount.incrementAndGet();
            }
        }).doOnComplete(() -> {
            this.mainSourceCompleted.set(true);
            long totalCountSnapshot = this.totalCount.get();
            logger.debug("Main source completed - # left items {}, Context: {}", (Object)totalCountSnapshot, (Object)this.operationContextText);
            if (totalCountSnapshot == 0L) {
                this.completeAllSinks();
            } else {
                this.onFlush();
            }
        }).mergeWith(this.mainFluxProcessor).flatMap(operation -> BulkExecutorUtil.resolvePartitionKeyRangeId(this.docClientWrapper, this.container, operation).map(pkRangeId -> {
            PartitionScopeThresholds partitionScopeThresholds = this.partitionScopeThresholds.computeIfAbsent((String)pkRangeId, newPkRangeId -> new PartitionScopeThresholds<TContext>((String)newPkRangeId, this.bulkOptions));
            return Pair.of(partitionScopeThresholds, operation);
        })).groupBy(Pair::getKey, Pair::getValue).flatMap(this::executePartitionedGroup).doOnNext(requestAndResponse -> {
            int totalCountAfterDecrement = this.totalCount.decrementAndGet();
            boolean mainSourceCompletedSnapshot = this.mainSourceCompleted.get();
            if (totalCountAfterDecrement == 0 && mainSourceCompletedSnapshot) {
                logger.debug("All work completed, Context: {}", (Object)this.operationContextText);
                this.completeAllSinks();
            } else {
                logger.debug("Work left - TotalCount after decrement: {}, main sink completed {}, Context: {}", new Object[]{totalCountAfterDecrement, mainSourceCompletedSnapshot, this.operationContextText});
            }
        }).doOnComplete(() -> {
            int totalCountSnapshot = this.totalCount.get();
            boolean mainSourceCompletedSnapshot = this.mainSourceCompleted.get();
            if (totalCountSnapshot == 0 && mainSourceCompletedSnapshot) {
                logger.debug("DoOnComplete: All work completed, Context: {}", (Object)this.operationContextText);
                this.completeAllSinks();
            } else {
                logger.debug("DoOnComplete: Work left - TotalCount after decrement: {}, main sink completed {}, Context: {}", new Object[]{totalCountSnapshot, mainSourceCompletedSnapshot, this.operationContextText});
            }
        });
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(GroupedFlux<PartitionScopeThresholds<TContext>, CosmosItemOperation> partitionedGroupFluxOfInputOperations) {
        PartitionScopeThresholds thresholds = (PartitionScopeThresholds)partitionedGroupFluxOfInputOperations.key();
        FluxProcessor groupFluxProcessor = UnicastProcessor.create().serialize();
        FluxSink groupSink = groupFluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        this.groupSinks.add((FluxSink<CosmosItemOperation>)groupSink);
        AtomicLong firstRecordTimeStamp = new AtomicLong(-1L);
        AtomicLong currentMicroBatchSize = new AtomicLong(0L);
        return partitionedGroupFluxOfInputOperations.mergeWith((Publisher)groupFluxProcessor).onBackpressureBuffer().timestamp().bufferUntil(timeStampItemOperationTuple -> {
            long timestamp = (Long)timeStampItemOperationTuple.getT1();
            CosmosItemOperation itemOperation = (CosmosItemOperation)timeStampItemOperationTuple.getT2();
            if (itemOperation == FlushBuffersItemOperation.singleton()) {
                if (currentMicroBatchSize.get() > 0L) {
                    logger.debug("Flushing PKRange {} due to FlushItemOperation, Context: {}", (Object)thresholds.getPartitionKeyRangeId(), (Object)this.operationContextText);
                    return true;
                }
                return false;
            }
            firstRecordTimeStamp.compareAndSet(-1L, timestamp);
            long age = timestamp - firstRecordTimeStamp.get();
            long batchSize = currentMicroBatchSize.incrementAndGet();
            if (batchSize >= (long)thresholds.getTargetMicroBatchSizeSnapshot() || age >= this.maxMicroBatchIntervalInMs) {
                logger.debug("Flushing PKRange {} due to BatchSize ({}) or age ({}), Context: {}", new Object[]{thresholds.getPartitionKeyRangeId(), batchSize, age, this.operationContextText});
                firstRecordTimeStamp.set(-1L);
                currentMicroBatchSize.set(0L);
                return true;
            }
            return false;
        }).flatMap(timeStampAndItemOperationTuples -> {
            ArrayList<CosmosItemOperation> operations = new ArrayList<CosmosItemOperation>(timeStampAndItemOperationTuples.size());
            for (Tuple2 timeStampAndItemOperationTuple : timeStampAndItemOperationTuples) {
                CosmosItemOperation itemOperation = (CosmosItemOperation)timeStampAndItemOperationTuple.getT2();
                if (itemOperation == FlushBuffersItemOperation.singleton()) continue;
                operations.add(itemOperation);
            }
            return this.executeOperations(operations, thresholds, (FluxSink<CosmosItemOperation>)groupSink);
        }, this.bulkOptions.getMaxMicroBatchConcurrency());
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(List<CosmosItemOperation> operations, PartitionScopeThresholds<TContext> thresholds, FluxSink<CosmosItemOperation> groupSink) {
        if (operations.size() == 0) {
            logger.debug("Empty operations list, Context: {}", (Object)this.operationContextText);
            return Flux.empty();
        }
        String pkRange = thresholds.getPartitionKeyRangeId();
        ServerOperationBatchRequest serverOperationBatchRequest = BulkExecutorUtil.createBatchRequest(operations, pkRange);
        if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
            serverOperationBatchRequest.getBatchPendingOperations().forEach(arg_0 -> groupSink.next(arg_0));
        }
        return Flux.just((Object)serverOperationBatchRequest.getBatchRequest()).publishOn(Schedulers.boundedElastic()).flatMap(serverRequest -> this.executePartitionKeyRangeServerBatchRequest((PartitionKeyRangeServerBatchRequest)serverRequest, groupSink, thresholds));
    }

    private Flux<CosmosBulkOperationResponse<TContext>> executePartitionKeyRangeServerBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds<TContext> thresholds) {
        return this.executeBatchRequest(serverRequest).flatMapMany(response -> Flux.fromIterable(response.getResults()).flatMap(result -> this.handleTransactionalBatchOperationResult((TransactionalBatchResponse)response, (TransactionalBatchOperationResult)result, groupSink, thresholds))).onErrorResume(throwable -> {
            if (!(throwable instanceof Exception)) {
                throw Exceptions.propagate((Throwable)throwable);
            }
            Exception exception = (Exception)throwable;
            return Flux.fromIterable(serverRequest.getOperations()).flatMap(itemOperation -> this.handleTransactionalBatchExecutionException((CosmosItemOperation)itemOperation, exception, groupSink, thresholds));
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchOperationResult(TransactionalBatchResponse response, TransactionalBatchOperationResult operationResult, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds<TContext> thresholds) {
        CosmosBulkItemResponse cosmosBulkItemResponse = BridgeInternal.createCosmosBulkItemResponse(operationResult, response);
        CosmosItemOperation itemOperation = operationResult.getOperation();
        if (!operationResult.isSuccessStatusCode()) {
            if (itemOperation instanceof ItemBulkOperation) {
                return ((ItemBulkOperation)itemOperation).getRetryPolicy().shouldRetry(operationResult).flatMap(result -> {
                    if (result.shouldRetry) {
                        return this.enqueueForRetry(result.backOffTime, groupSink, itemOperation, thresholds);
                    }
                    return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, this.batchContext));
                });
            }
            throw new UnsupportedOperationException("Unknown CosmosItemOperation.");
        }
        thresholds.recordSuccessfulOperation();
        return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, cosmosBulkItemResponse, this.batchContext));
    }

    private Mono<CosmosBulkOperationResponse<TContext>> handleTransactionalBatchExecutionException(CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink, PartitionScopeThresholds<TContext> thresholds) {
        if (exception instanceof CosmosException && itemOperation instanceof ItemBulkOperation) {
            CosmosException cosmosException = (CosmosException)((Object)exception);
            ItemBulkOperation itemBulkOperation = (ItemBulkOperation)itemOperation;
            return itemBulkOperation.getRetryPolicy().shouldRetryForGone(cosmosException.getStatusCode(), cosmosException.getSubStatusCode()).flatMap(shouldRetryGone -> {
                if (shouldRetryGone.booleanValue()) {
                    this.mainSink.next((Object)itemOperation);
                    return Mono.empty();
                }
                return this.retryOtherExceptions(itemOperation, exception, groupSink, cosmosException, itemBulkOperation, thresholds);
            });
        }
        return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
    }

    private Mono<CosmosBulkOperationResponse<TContext>> enqueueForRetry(Duration backOffTime, FluxSink<CosmosItemOperation> groupSink, CosmosItemOperation itemOperation, PartitionScopeThresholds<TContext> thresholds) {
        thresholds.recordEnqueuedRetry();
        if (backOffTime == null || backOffTime.isZero()) {
            groupSink.next((Object)itemOperation);
            return Mono.empty();
        }
        return Mono.delay((Duration)backOffTime).flatMap(dummy -> {
            groupSink.next((Object)itemOperation);
            return Mono.empty();
        });
    }

    private Mono<CosmosBulkOperationResponse<TContext>> retryOtherExceptions(CosmosItemOperation itemOperation, Exception exception, FluxSink<CosmosItemOperation> groupSink, CosmosException cosmosException, ItemBulkOperation<?> itemBulkOperation, PartitionScopeThresholds<TContext> thresholds) {
        return itemBulkOperation.getRetryPolicy().shouldRetry((Exception)((Object)cosmosException)).flatMap(result -> {
            if (result.shouldRetry) {
                return this.enqueueForRetry(result.backOffTime, groupSink, itemBulkOperation, thresholds);
            }
            return Mono.just(BridgeInternal.createCosmosBulkOperationResponse(itemOperation, exception, this.batchContext));
        });
    }

    private Mono<TransactionalBatchResponse> executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) {
        RequestOptions options = new RequestOptions();
        options.setOperationContextAndListenerTuple(this.operationListener);
        return this.docClientWrapper.executeBatchRequest(BridgeInternal.getLink(this.container), serverRequest, options, false);
    }

    private void completeAllSinks() {
        logger.info("Closing all sinks, Context: {}", (Object)this.operationContextText);
        this.executorService.shutdown();
        logger.debug("Executor service shut down, Context: {}", (Object)this.operationContextText);
        this.mainSink.complete();
        logger.debug("Main sink completed, Context: {}", (Object)this.operationContextText);
        this.groupSinks.forEach(FluxSink::complete);
        logger.debug("All group sinks completed, Context: {}", (Object)this.operationContextText);
    }

    private void onFlush() {
        this.groupSinks.forEach(sink -> sink.next((Object)FlushBuffersItemOperation.singleton()));
    }
}

