/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.documentdb.bulkexecutor.internal;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.Error;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.StoredProcedureResponse;
import com.microsoft.azure.documentdb.bulkexecutor.BulkUpdateFailure;
import com.microsoft.azure.documentdb.bulkexecutor.UpdateItem;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchOperator;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BulkUpdateStoredProcedureResponse;
import com.microsoft.azure.documentdb.bulkexecutor.internal.ExceptionUtils;
import com.microsoft.azure.documentdb.bulkexecutor.internal.OperationMetrics;
import com.microsoft.azure.documentdb.repackaged.com.google.common.base.Stopwatch;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchUpdater
extends BatchOperator {
    public AtomicInteger numberOfDocumentsUpdated;
    public AtomicDouble totalRequestUnitsConsumed;
    private final List<List<UpdateItem>> batchesToUpdate;
    private List<UpdateItem> documentsFailedToUpdateDueToSplits;
    private List<BulkUpdateFailure> bulkUpdateFailures;
    private final String bulkUpdateSprocLink;
    private final String partitionKeyProperty;
    private final int maxRetryCountOnTimeouts = 5;
    private final Logger logger = LoggerFactory.getLogger(BatchUpdater.class);

    public BatchUpdater(String partitionKeyRangeId, List<List<UpdateItem>> batchesToUpdate, DocumentClient client, String bulkUpdateSprocLink, String partitionKeyProperty) {
        this.partitionKeyRangeId = partitionKeyRangeId;
        this.batchesToUpdate = batchesToUpdate;
        this.client = client;
        this.bulkUpdateSprocLink = bulkUpdateSprocLink;
        this.partitionKeyProperty = partitionKeyProperty;
        this.numberOfDocumentsUpdated = new AtomicInteger();
        this.totalRequestUnitsConsumed = new AtomicDouble();
        this.documentsFailedToUpdateDueToSplits = Collections.synchronizedList(new ArrayList());
        this.bulkUpdateFailures = Collections.synchronizedList(new ArrayList());
        class RequestOptionsInternal
        extends RequestOptions {
            RequestOptionsInternal(String partitionKeyRangeId) {
                this.setPartitionKeyRengeId(partitionKeyRangeId);
            }
        }
        this.requestOptions = new RequestOptionsInternal(partitionKeyRangeId);
    }

    public int getNumberOfDocumentsUpdated() {
        return this.numberOfDocumentsUpdated.get();
    }

    public double getTotalRequestUnitsConsumed() {
        return this.totalRequestUnitsConsumed.get();
    }

    public List<UpdateItem> getDocumentsFailedToUpdateDueToSplits() {
        return this.documentsFailedToUpdateDueToSplits;
    }

    public List<BulkUpdateFailure> getBulkUpdateFailures() {
        return this.bulkUpdateFailures;
    }

    @Override
    public Iterator<Callable<OperationMetrics>> miniBatchExecutionCallableIterator() {
        Stream<Callable> stream = this.batchesToUpdate.stream().map(miniBatch -> new Callable<OperationMetrics>(){

            @Override
            public OperationMetrics call() {
                int currentUpdateItemIndex = 0;
                Stopwatch stopwatch = Stopwatch.createStarted();
                double requestUnitsCounsumed = 0.0;
                int numberOfThrottles = 0;
                int numberOfTimeouts = 0;
                try {
                    BatchUpdater.this.logger.debug("pki {} updating mini batch started", (Object)BatchUpdater.this.partitionKeyRangeId);
                    while (currentUpdateItemIndex < miniBatch.size() && !BatchUpdater.this.cancel) {
                        BulkUpdateFailure bulkUpdateFailure;
                        BatchUpdater.this.logger.debug("pki {} inside for loop, currentUpdateItemIndex", (Object)BatchUpdater.this.partitionKeyRangeId, (Object)currentUpdateItemIndex);
                        List updateItemBatch = miniBatch.subList(currentUpdateItemIndex, miniBatch.size());
                        boolean isThrottled = false;
                        Duration retryAfter = Duration.ZERO;
                        try {
                            BatchUpdater.this.logger.debug("pki {}, Trying to update minibatch of {} update items", (Object)BatchUpdater.this.partitionKeyRangeId, (Object)updateItemBatch.size());
                            StoredProcedureResponse response = BatchUpdater.this.client.executeStoredProcedure(BatchUpdater.this.bulkUpdateSprocLink, BatchUpdater.this.requestOptions, new Object[]{updateItemBatch, BatchUpdater.this.partitionKeyProperty, null});
                            BulkUpdateStoredProcedureResponse bulkUpdateResponse = BatchUpdater.this.parseFrom(response);
                            if (bulkUpdateResponse != null) {
                                if (bulkUpdateResponse.errorCode != 0) {
                                    BatchUpdater.this.logger.warn("pki {} Received response error code {}", (Object)BatchUpdater.this.partitionKeyRangeId, (Object)bulkUpdateResponse.errorCode);
                                    if (bulkUpdateResponse.count == 0) {
                                        BulkUpdateFailure bulkUpdateFailure2 = new BulkUpdateFailure();
                                        bulkUpdateFailure2.getFailedUpdateItems().addAll(updateItemBatch);
                                        bulkUpdateFailure2.setBulkUpdateFailureException(new RuntimeException(String.format("Stored proc returned failure %s", bulkUpdateResponse.errorCode)));
                                        BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure2);
                                        currentUpdateItemIndex = miniBatch.size();
                                    } else if (bulkUpdateResponse.errorCode == 404) {
                                        HashMap<String, String> responseHeaders = new HashMap<String, String>();
                                        responseHeaders.put("x-ms-substatus", String.valueOf(1002));
                                        bulkUpdateFailure = new BulkUpdateFailure();
                                        bulkUpdateFailure.getFailedUpdateItems().addAll(updateItemBatch);
                                        bulkUpdateFailure.setBulkUpdateFailureException((Exception)((Object)new DocumentClientException(404, new Error("{ 'message': 'Batch contains non-existent documents' }"), responseHeaders)));
                                        BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure);
                                        BatchUpdater.this.cancel = true;
                                    }
                                }
                                double requestCharge = response.getRequestCharge();
                                currentUpdateItemIndex += bulkUpdateResponse.count;
                                BatchUpdater.this.numberOfDocumentsUpdated.addAndGet(bulkUpdateResponse.count);
                                requestUnitsCounsumed += requestCharge;
                                BatchUpdater.this.totalRequestUnitsConsumed.addAndGet(requestCharge);
                            } else {
                                BatchUpdater.this.logger.warn("pki {} Failed to receive response", (Object)BatchUpdater.this.partitionKeyRangeId);
                            }
                        }
                        catch (DocumentClientException e) {
                            BatchUpdater.this.logger.debug("pki {} Updating minibatch failed", (Object)BatchUpdater.this.partitionKeyRangeId, (Object)e);
                            if (ExceptionUtils.isThrottled(e)) {
                                BatchUpdater.this.logger.debug("pki {} Throttled on partition range id", (Object)BatchUpdater.this.partitionKeyRangeId);
                                ++numberOfThrottles;
                                isThrottled = true;
                                retryAfter = Duration.ofMillis(e.getRetryAfterInMilliseconds());
                            } else if (ExceptionUtils.isTimedOut(e)) {
                                BatchUpdater.this.logger.debug("pki {} Request timed out", (Object)BatchUpdater.this.partitionKeyRangeId);
                                if (numberOfTimeouts < 5) {
                                    ++numberOfTimeouts;
                                } else {
                                    BulkUpdateFailure bulkUpdateFailure3 = new BulkUpdateFailure();
                                    bulkUpdateFailure3.getFailedUpdateItems().addAll(updateItemBatch);
                                    bulkUpdateFailure3.setBulkUpdateFailureException((Exception)((Object)e));
                                    BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure3);
                                    BatchUpdater.this.cancel = true;
                                }
                            } else if (ExceptionUtils.isUnavailable(e)) {
                                BatchUpdater.this.logger.debug("pki {} Service unavailable", (Object)BatchUpdater.this.partitionKeyRangeId);
                                BatchUpdater.this.documentsFailedToUpdateDueToSplits.addAll(updateItemBatch);
                                BatchUpdater.this.logger.warn("Received Service unavailable exception when updating a mini-batch for partition key range: " + BatchUpdater.this.partitionKeyRangeId + ". This mini-batch will be retried on the next invocation.");
                                BatchUpdater.this.cancel = true;
                            } else if (ExceptionUtils.isGone(e)) {
                                if (ExceptionUtils.isSplit(e)) {
                                    BatchUpdater.this.documentsFailedToUpdateDueToSplits.addAll(updateItemBatch);
                                    BatchUpdater.this.logger.warn("Received a GoneException on Partition range id " + BatchUpdater.this.partitionKeyRangeId + " as the partition was completing a split | Storing the mini batch and retrying");
                                } else {
                                    BatchUpdater.this.documentsFailedToUpdateDueToSplits.addAll(updateItemBatch);
                                    BatchUpdater.this.logger.warn("Received a GoneException on Partition range id " + BatchUpdater.this.partitionKeyRangeId + " | Storing the mini batch and retrying");
                                }
                                BatchUpdater.this.cancel = true;
                            } else {
                                String errorMessage = String.format("pki %s failed to update mini-batch. Exception was %s. Status code was %s", BatchUpdater.this.partitionKeyRangeId, e.getMessage(), e.getStatusCode());
                                BatchUpdater.this.logger.error(errorMessage, (Throwable)e);
                                bulkUpdateFailure = new BulkUpdateFailure();
                                bulkUpdateFailure.getFailedUpdateItems().addAll(updateItemBatch);
                                bulkUpdateFailure.setBulkUpdateFailureException(new RuntimeException(e));
                                BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure);
                                BatchUpdater.this.cancel = true;
                            }
                        }
                        catch (IllegalStateException e) {
                            BatchUpdater.this.documentsFailedToUpdateDueToSplits.addAll(updateItemBatch);
                            BatchUpdater.this.logger.warn("Received IllegalStateException since partition key range: " + BatchUpdater.this.partitionKeyRangeId + " was split or Gone. | Storing the mini batch and retrying");
                            BatchUpdater.this.cancel = true;
                        }
                        catch (Exception e) {
                            String errorMessage = String.format("pki %s Failed to update mini-batch. Exception was %s", BatchUpdater.this.partitionKeyRangeId, e.getMessage());
                            BatchUpdater.this.logger.error(errorMessage, (Throwable)e);
                            bulkUpdateFailure = new BulkUpdateFailure();
                            bulkUpdateFailure.getFailedUpdateItems().addAll(updateItemBatch);
                            bulkUpdateFailure.setBulkUpdateFailureException(new RuntimeException(errorMessage, e));
                            BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure);
                            BatchUpdater.this.cancel = true;
                        }
                        if (!isThrottled) continue;
                        try {
                            BatchUpdater.this.logger.debug("pki {} throttled going to sleep for {} millis ", (Object)BatchUpdater.this.partitionKeyRangeId, (Object)retryAfter.toMillis());
                            Thread.sleep(retryAfter.toMillis());
                        }
                        catch (InterruptedException e) {
                            BulkUpdateFailure bulkUpdateFailure4 = new BulkUpdateFailure();
                            bulkUpdateFailure4.getFailedUpdateItems().addAll(updateItemBatch);
                            bulkUpdateFailure4.setBulkUpdateFailureException(new RuntimeException(e));
                            BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure4);
                            BatchUpdater.this.cancel = true;
                        }
                    }
                }
                catch (Exception e) {
                    BatchUpdater.this.cancel = true;
                    String errorMessage = String.format("pki %s Failed to update mini-batch. Exception was %s", BatchUpdater.this.partitionKeyRangeId, e.getMessage());
                    BatchUpdater.this.logger.error(errorMessage, (Throwable)e);
                    BulkUpdateFailure bulkUpdateFailure = new BulkUpdateFailure();
                    bulkUpdateFailure.getFailedUpdateItems().addAll(miniBatch);
                    bulkUpdateFailure.setBulkUpdateFailureException(new RuntimeException(e));
                    BatchUpdater.this.bulkUpdateFailures.add(bulkUpdateFailure);
                }
                BatchUpdater.this.logger.debug("pki {} completed", (Object)BatchUpdater.this.partitionKeyRangeId);
                stopwatch.stop();
                OperationMetrics insertMetrics = new OperationMetrics(currentUpdateItemIndex, stopwatch.elapsed(), requestUnitsCounsumed, numberOfThrottles);
                return insertMetrics;
            }
        });
        return stream.iterator();
    }

    private BulkUpdateStoredProcedureResponse parseFrom(StoredProcedureResponse storedProcResponse) throws JsonParseException, JsonMappingException, IOException {
        String res = storedProcResponse.getResponseAsString();
        this.logger.debug("MiniBatch Update for Partition Key Range Id {}: Stored Proc Response as String {}", (Object)this.partitionKeyRangeId, (Object)res);
        if (StringUtils.isEmpty((CharSequence)res)) {
            return null;
        }
        return (BulkUpdateStoredProcedureResponse)objectMapper.readValue(res, BulkUpdateStoredProcedureResponse.class);
    }
}

