/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.documentdb.bulkexecutor;

import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.Error;
import com.microsoft.azure.documentdb.FeedOptions;
import com.microsoft.azure.documentdb.PartitionKeyDefinition;
import com.microsoft.azure.documentdb.PartitionKeyRange;
import com.microsoft.azure.documentdb.RequestOptions;
import com.microsoft.azure.documentdb.RetryOptions;
import com.microsoft.azure.documentdb.bulkexecutor.BulkDeleteResponse;
import com.microsoft.azure.documentdb.bulkexecutor.BulkImportFailure;
import com.microsoft.azure.documentdb.bulkexecutor.BulkImportResponse;
import com.microsoft.azure.documentdb.bulkexecutor.BulkUpdateFailure;
import com.microsoft.azure.documentdb.bulkexecutor.BulkUpdateResponse;
import com.microsoft.azure.documentdb.bulkexecutor.SetUpdateOperation;
import com.microsoft.azure.documentdb.bulkexecutor.UpdateItem;
import com.microsoft.azure.documentdb.bulkexecutor.UpdateOperationBase;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchDeleter;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchInserter;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchUpdater;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BulkDeleteQuerySpec;
import com.microsoft.azure.documentdb.bulkexecutor.internal.BulkImportStoredProcedureOptions;
import com.microsoft.azure.documentdb.bulkexecutor.internal.CongestionController;
import com.microsoft.azure.documentdb.bulkexecutor.internal.DocumentAnalyzer;
import com.microsoft.azure.documentdb.bulkexecutor.internal.ExceptionUtils;
import com.microsoft.azure.documentdb.internal.routing.CollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.InMemoryCollectionRoutingMap;
import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal;
import com.microsoft.azure.documentdb.internal.routing.Range;
import com.microsoft.azure.documentdb.repackaged.com.google.common.base.Preconditions;
import com.microsoft.azure.documentdb.repackaged.com.google.common.base.Stopwatch;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.AsyncCallable;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.Futures;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.MoreExecutors;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DocumentBulkExecutor
implements AutoCloseable {
    private static final String BULK_IMPORT_STORED_PROCECURE_NAME = "__.sys.commonBulkInsert";
    private static final String BULK_UPDATE_STORED_PROCECURE_NAME = "__.sys.bulkPatch";
    private static final String BULK_DELETE_STORED_PROCECURE_NAME = "__.sys.commonDelete";
    private static final int MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE = 1101005;
    private static final double FRACTION_OF_MAX_BULK_IMPORT_SCRIPT_INPUT_SIZE_ALLOWED = 0.2;
    private static final int INITIALIZATION_SLEEP_TIME_ON_THROTTLING = 500;
    private static final int DEFAULT_BULK_DELETE_BATCH_SIZE = 1000;
    private static final int SLEEP_TIME_FOR_RETRY_POST_SPLIT_IN_MILLIS = 65000;
    private static final int MAX_RETRIES_ON_SPLIT_FAILURES = 10;
    private final Logger logger = LoggerFactory.getLogger(DocumentBulkExecutor.class);
    private final Map<String, Integer> partitionKeyRangeIdToInferredDegreeOfParallelism = new ConcurrentHashMap<String, Integer>();
    private static final String SQL_QUERY_REGEX_PATTERN = "(?i)select\\s+\\*\\s+(?i)from\\s+(?<root>c)\\s+(?i)where(?:\\s+(?<filter>.+))?";
    private static final Pattern BULK_DELETE_QUERY_SPEC_PATTERN = Pattern.compile("(?i)select\\s+\\*\\s+(?i)from\\s+(?<root>c)\\s+(?i)where(?:\\s+(?<filter>.+))?");
    private final ListeningExecutorService listeningExecutorService;
    private final DocumentClient client;
    private final String collectionLink;
    private final PartitionKeyDefinition partitionKeyDefinition;
    private List<String> partitionKeyRangeIds;
    private CollectionRoutingMap collectionRoutingMap;
    private String bulkImportStoredProcLink;
    private String bulkUpdateStoredProcLink;
    private String bulkDeleteStoredProcLink;
    private int collectionThroughput;
    private int maxMiniBatchSize;
    private int maxUpdateMiniBatchCount;
    private RetryOptions retryOptions;

    public static Builder builder() {
        return new Builder();
    }

    private void setMaxMiniBatchSize(int size) {
        this.maxMiniBatchSize = size;
    }

    private void setMaxUpdateMiniBatchCount(int count) {
        this.maxUpdateMiniBatchCount = count;
    }

    private void setInitializationRetryOptions(RetryOptions options) {
        this.retryOptions = options;
    }

    private DocumentBulkExecutor(DocumentClient client, String collectionLink, PartitionKeyDefinition partitionKeyDefinition, int collectionOfferThroughput) {
        Preconditions.checkNotNull(client, "client cannot be null");
        Preconditions.checkNotNull(partitionKeyDefinition, "partitionKeyDefinition cannot be null");
        Preconditions.checkNotNull(collectionLink, "collectionLink cannot be null");
        Preconditions.checkArgument(collectionOfferThroughput > 0, "collection throughput is less than 10,000");
        this.client = client;
        this.collectionLink = collectionLink;
        this.collectionThroughput = collectionOfferThroughput;
        this.partitionKeyDefinition = partitionKeyDefinition;
        this.listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    private void safeInit() throws Exception {
        int count = 0;
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                this.initialize();
            }
            catch (Exception e) {
                DocumentClientException dce = ExceptionUtils.getThrottelingException(e);
                long now = System.currentTimeMillis();
                if (++count < this.retryOptions.getMaxRetryAttemptsOnThrottledRequests() && now - startTime < (long)(this.retryOptions.getMaxRetryWaitTimeInSeconds() * 1000) && dce != null && dce.getStatusCode() == 429) {
                    Thread.sleep((long)count * dce.getRetryAfterInMilliseconds() + 500L);
                    continue;
                }
                throw e;
            }
            break;
        }
    }

    @Override
    public void close() {
        this.listeningExecutorService.shutdown();
        try {
            if (!this.listeningExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.listeningExecutorService.shutdownNow();
                if (!this.listeningExecutorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                    this.logger.error("some tasks did not terminate");
                }
            }
        }
        catch (InterruptedException e) {
            this.listeningExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private void initialize() throws DocumentClientException {
        this.logger.debug("Initializing ...");
        this.bulkImportStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_IMPORT_STORED_PROCECURE_NAME);
        this.bulkUpdateStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_UPDATE_STORED_PROCECURE_NAME);
        this.bulkDeleteStoredProcLink = String.format("%s/sprocs/%s", this.collectionLink, BULK_DELETE_STORED_PROCECURE_NAME);
        this.logger.debug("Fetching partition map of collection");
        Range fullRange = new Range((Comparable)((Object)PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey), (Comparable)((Object)PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey), true, false);
        this.collectionRoutingMap = DocumentBulkExecutor.getCollectionRoutingMap(this.client, this.collectionLink);
        Collection partitionKeyRanges = this.collectionRoutingMap.getOverlappingRanges(fullRange);
        this.partitionKeyRangeIds = partitionKeyRanges.stream().map(partitionKeyRange -> partitionKeyRange.getId()).collect(Collectors.toList());
        this.logger.debug("Initialization completed");
    }

    public BulkImportResponse importAll(Collection<String> documents, boolean isUpsert, boolean disableAutomaticIdGeneration, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        return this.executeBulkImportInternal(documents, isUpsert, disableAutomaticIdGeneration, maxConcurrencyPerPartitionRange);
    }

    public BulkUpdateResponse updateAll(Collection<UpdateItem> updateItems, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        return this.executeBulkUpdateInternal(updateItems, maxConcurrencyPerPartitionRange);
    }

    public BulkUpdateResponse mergeAll(Collection<Document> patchDocuments, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        return this.executeBulkUpdateWithPatchInternal(patchDocuments, maxConcurrencyPerPartitionRange);
    }

    public BulkDeleteResponse deleteAll(List<Pair<String, String>> pkIdPairsToDelete) throws DocumentClientException {
        return this.executeBulkDeleteInternalPkRowKeys(pkIdPairsToDelete);
    }

    private BulkUpdateResponse updateDocument(String partitionKey, String id, List<UpdateOperationBase> updateOperations) throws DocumentClientException {
        return this.executeUpdateDocumentInternal(partitionKey, id, updateOperations);
    }

    private BulkImportResponse executeBulkImportInternal(Collection<String> input, boolean isUpsert, boolean disableAutomaticIdGeneration, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        Preconditions.checkNotNull(input, "document collection cannot be null");
        try {
            ArrayList<String> documentsFailedToImportDueToSplits;
            ArrayList<String> documentsToInsertOrRetry = new ArrayList<String>(input);
            ArrayList<BulkImportFailure> failedImports = new ArrayList<BulkImportFailure>();
            int numRetriesDueToSplits = 0;
            ArrayList<Exception> failures = new ArrayList<Exception>();
            ArrayList<Object> badInputDocuments = new ArrayList<Object>();
            int numberOfDocumentsImported = 0;
            double totalRequestUnitsConsumed = 0.0;
            Duration timeTakenForInserts = Duration.ofSeconds(0L);
            do {
                documentsFailedToImportDueToSplits = new ArrayList<String>();
                BulkImportResponse eachInsertOrRetryResponse = (BulkImportResponse)this.executeBulkImportAsyncImpl(documentsToInsertOrRetry, documentsFailedToImportDueToSplits, failedImports, isUpsert, disableAutomaticIdGeneration, maxConcurrencyPerPartitionRange).get();
                failures.addAll(eachInsertOrRetryResponse.getErrors());
                badInputDocuments.addAll(eachInsertOrRetryResponse.getBadInputDocuments());
                numberOfDocumentsImported += eachInsertOrRetryResponse.getNumberOfDocumentsImported();
                totalRequestUnitsConsumed += eachInsertOrRetryResponse.getTotalRequestUnitsConsumed();
                timeTakenForInserts = timeTakenForInserts.plus(eachInsertOrRetryResponse.getTotalTimeTaken());
                if (documentsFailedToImportDueToSplits.size() <= 0) continue;
                ++numRetriesDueToSplits;
                this.initialize();
                documentsToInsertOrRetry = new ArrayList<String>(documentsFailedToImportDueToSplits);
            } while (documentsFailedToImportDueToSplits.size() > 0 && numRetriesDueToSplits <= 10);
            if (numRetriesDueToSplits > 10) {
                HashMap<String, String> responseHeaders = new HashMap<String, String>();
                responseHeaders.put("x-ms-substatus", String.valueOf(1002));
                BulkImportFailure bulkImportFailure = new BulkImportFailure();
                bulkImportFailure.getDocumentsFailedToImport().addAll(documentsFailedToImportDueToSplits);
                bulkImportFailure.setBulkImportFailureException((Exception)new DocumentClientException(503, new Error("{ 'message': 'Max retries for BulkExecutor exhausted. Please re-initialize BulkExecutor and retry latest batch import.' }"), responseHeaders));
                failedImports.add(bulkImportFailure);
            }
            BulkImportResponse bulkImportResponse = new BulkImportResponse(numberOfDocumentsImported, totalRequestUnitsConsumed, timeTakenForInserts, failures, badInputDocuments, failedImports);
            return bulkImportResponse;
        }
        catch (ExecutionException e) {
            this.logger.error("Failed to import documents", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to import documents", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private BulkUpdateResponse executeBulkUpdateInternal(Collection<UpdateItem> updateItems, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        Preconditions.checkNotNull(updateItems, "update items cannot be null");
        try {
            ArrayList<UpdateItem> updatesToAttemptOrRetry = new ArrayList<UpdateItem>(updateItems);
            ArrayList<UpdateItem> documentsFailedToUpdateDueToSplits = new ArrayList<UpdateItem>();
            ArrayList<BulkUpdateFailure> bulkUpdateFailures = new ArrayList<BulkUpdateFailure>();
            int numRetriesDueToSplits = 0;
            int numberOfDocumentsUpdated = 0;
            double totalRequestUnitsConsumed = 0.0;
            Duration totalTimeTaken = Duration.ofMillis(0L);
            ArrayList<Exception> errors = new ArrayList<Exception>();
            do {
                documentsFailedToUpdateDueToSplits = new ArrayList();
                BulkUpdateResponse eachUpdateOrRetryResponse = (BulkUpdateResponse)this.executeBulkUpdateAsyncImpl(updatesToAttemptOrRetry, documentsFailedToUpdateDueToSplits, maxConcurrencyPerPartitionRange).get();
                numberOfDocumentsUpdated += eachUpdateOrRetryResponse.getNumberOfDocumentsUpdated();
                totalRequestUnitsConsumed += eachUpdateOrRetryResponse.getTotalRequestUnitsConsumed();
                totalTimeTaken = totalTimeTaken.plus(eachUpdateOrRetryResponse.getTotalTimeTaken());
                errors.addAll(eachUpdateOrRetryResponse.getErrors());
                bulkUpdateFailures.addAll(eachUpdateOrRetryResponse.getFailedUpdates());
                if (documentsFailedToUpdateDueToSplits.size() <= 0) continue;
                ++numRetriesDueToSplits;
                this.initialize();
                updatesToAttemptOrRetry = new ArrayList<UpdateItem>(documentsFailedToUpdateDueToSplits);
            } while (documentsFailedToUpdateDueToSplits.size() > 0 && numRetriesDueToSplits <= 10);
            if (numRetriesDueToSplits > 10) {
                HashMap<String, String> responseHeaders = new HashMap<String, String>();
                responseHeaders.put("x-ms-substatus", String.valueOf(1002));
                BulkUpdateFailure bulkUpdateFailure = new BulkUpdateFailure();
                bulkUpdateFailure.getFailedUpdateItems().addAll(documentsFailedToUpdateDueToSplits);
                bulkUpdateFailure.setBulkUpdateFailureException((Exception)new DocumentClientException(503, new Error("{ 'message': 'Max retries for BulkExecutor exhausted. Please re-initialize BulkExecutor and retry latest batch update.' }"), responseHeaders));
                bulkUpdateFailures.add(bulkUpdateFailure);
            }
            BulkUpdateResponse bulkUpdateResponse = new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, totalTimeTaken, errors, bulkUpdateFailures);
            return bulkUpdateResponse;
        }
        catch (ExecutionException e) {
            this.logger.error("Failed to update documents", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to update documents", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private BulkUpdateResponse executeBulkUpdateWithPatchInternal(Collection<Document> patchDocuments, Integer maxConcurrencyPerPartitionRange) throws DocumentClientException {
        Preconditions.checkNotNull(patchDocuments, "patch documents cannot be null");
        try {
            ArrayList<UpdateItem> documentsFailedToUpdateDueToSplits = new ArrayList<UpdateItem>();
            int numRetriesDueToSplits = 0;
            int numberOfDocumentsUpdated = 0;
            double totalRequestUnitsConsumed = 0.0;
            Duration totalTimeTaken = Duration.ofMillis(0L);
            ArrayList<Exception> errors = new ArrayList<Exception>();
            ArrayList<BulkUpdateFailure> bulkUpdateFailures = new ArrayList<BulkUpdateFailure>();
            BulkUpdateResponse eachUpdateOrRetryResponse = (BulkUpdateResponse)this.executeBulkUpdateWithPatchAsyncImpl(patchDocuments, documentsFailedToUpdateDueToSplits, maxConcurrencyPerPartitionRange).get();
            numberOfDocumentsUpdated += eachUpdateOrRetryResponse.getNumberOfDocumentsUpdated();
            totalRequestUnitsConsumed += eachUpdateOrRetryResponse.getTotalRequestUnitsConsumed();
            totalTimeTaken = totalTimeTaken.plus(eachUpdateOrRetryResponse.getTotalTimeTaken());
            errors.addAll(eachUpdateOrRetryResponse.getErrors());
            bulkUpdateFailures.addAll(eachUpdateOrRetryResponse.getFailedUpdates());
            while (documentsFailedToUpdateDueToSplits.size() > 0 && numRetriesDueToSplits <= 10) {
                ++numRetriesDueToSplits;
                this.initialize();
                ArrayList<UpdateItem> updatesToAttemptOrRetry = new ArrayList<UpdateItem>(documentsFailedToUpdateDueToSplits);
                documentsFailedToUpdateDueToSplits = new ArrayList();
                eachUpdateOrRetryResponse = (BulkUpdateResponse)this.executeBulkUpdateAsyncImpl(updatesToAttemptOrRetry, documentsFailedToUpdateDueToSplits, maxConcurrencyPerPartitionRange).get();
                numberOfDocumentsUpdated += eachUpdateOrRetryResponse.getNumberOfDocumentsUpdated();
                totalRequestUnitsConsumed += eachUpdateOrRetryResponse.getTotalRequestUnitsConsumed();
                totalTimeTaken = totalTimeTaken.plus(eachUpdateOrRetryResponse.getTotalTimeTaken());
                errors.addAll(eachUpdateOrRetryResponse.getErrors());
                bulkUpdateFailures.addAll(eachUpdateOrRetryResponse.getFailedUpdates());
            }
            if (numRetriesDueToSplits > 10) {
                HashMap<String, String> responseHeaders = new HashMap<String, String>();
                responseHeaders.put("x-ms-substatus", String.valueOf(1002));
                BulkUpdateFailure bulkUpdateFailure = new BulkUpdateFailure();
                bulkUpdateFailure.getFailedUpdateItems().addAll(documentsFailedToUpdateDueToSplits);
                bulkUpdateFailure.setBulkUpdateFailureException((Exception)new DocumentClientException(503, new Error("{ 'message': 'Max retries for BulkExecutor exhausted. Please re-initialize BulkExecutor and retry latest batch update.' }"), responseHeaders));
                bulkUpdateFailures.add(bulkUpdateFailure);
            }
            BulkUpdateResponse bulkUpdateResponse = new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, totalTimeTaken, errors, bulkUpdateFailures);
            return bulkUpdateResponse;
        }
        catch (ExecutionException e) {
            this.logger.error("Failed to update documents", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to update documents", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private BulkUpdateResponse executeUpdateDocumentInternal(String partitionKey, String id, List<UpdateOperationBase> updateOperations) throws DocumentClientException {
        Preconditions.checkNotNull(partitionKey, "partitionKey cannot be null");
        Preconditions.checkNotNull(id, "id cannot be null");
        Preconditions.checkNotNull(updateOperations, "update operations cannot be null");
        try {
            return (BulkUpdateResponse)this.executeUpdateDocumentAsyncImpl(partitionKey, id, updateOperations).get();
        }
        catch (ExecutionException e) {
            this.logger.error("Failed to update document", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to update document", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private BulkDeleteResponse executeBulkDeleteInternal(String query, RequestOptions requestOptions) throws DocumentClientException {
        Preconditions.checkNotNull(query, "query to fetch documents to delete cannot be null");
        try {
            return (BulkDeleteResponse)this.executeBulkDeleteAsyncImpl(query, requestOptions).get();
        }
        catch (ExecutionException e) {
            this.logger.error("Failed to delete document", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to delete document", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private BulkDeleteResponse executeBulkDeleteInternalPkRowKeys(List<Pair<String, String>> pkIdPairsToDelete) throws DocumentClientException {
        Preconditions.checkNotNull(pkIdPairsToDelete, "list of pairs of partition key and ids to delete cannot be null");
        ArrayList<Integer> nullPairs = new ArrayList<Integer>();
        for (int eachPkIdPairIndex = 0; eachPkIdPairIndex < pkIdPairsToDelete.size(); ++eachPkIdPairIndex) {
            if (pkIdPairsToDelete.get(eachPkIdPairIndex) != null) continue;
            nullPairs.add(eachPkIdPairIndex);
        }
        if (nullPairs.size() > 0) {
            throw new NullPointerException("Input list of pairs of partition keys and ids to delete contains null entries. Indices of null entires are: " + ((Object)nullPairs).toString());
        }
        try {
            return (BulkDeleteResponse)this.executeBulkDeletePkRowKeyPairsAsyncImpl(pkIdPairsToDelete).get();
        }
        catch (ExecutionException e) {
            this.logger.debug("Failed to delete document", (Throwable)e);
            Throwable cause = e.getCause();
            if (cause instanceof Exception) {
                throw this.toDocumentClientException((Exception)cause);
            }
            throw this.toDocumentClientException(e);
        }
        catch (Exception e) {
            this.logger.error("Failed to delete documents", (Throwable)e);
            throw this.toDocumentClientException(e);
        }
    }

    private ListenableFuture<BulkImportResponse> executeBulkImportAsyncImpl(Collection<String> documents, final Collection<String> documentsFailedToImportDueToSplits, final List<BulkImportFailure> failedImports, boolean isUpsert, boolean disableAutomaticIdGeneration, Integer maxConcurrencyPerPartitionRange) throws Exception {
        final Stopwatch watch = Stopwatch.createStarted();
        BulkImportStoredProcedureOptions options = new BulkImportStoredProcedureOptions(disableAutomaticIdGeneration, false, null, false, isUpsert, true);
        this.logger.debug("Bucketing documents ...");
        ConcurrentHashMap documentsToImportByPartition = new ConcurrentHashMap();
        ConcurrentHashMap miniBatchesToImportByPartition = new ConcurrentHashMap();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            documentsToImportByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet(documents.size() / this.partitionKeyRangeIds.size()));
            miniBatchesToImportByPartition.put(partitionKeyRangeId, new ArrayList(1000));
        }
        documents.parallelStream().forEach(documentAsString -> {
            PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.extractPartitionKeyValue(documentAsString, this.partitionKeyDefinition);
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
            String partitionRangeId = this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            ((Set)documentsToImportByPartition.get(partitionRangeId)).add(documentAsString);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        documentsToImportByPartition.entrySet().parallelStream().forEach(entry -> {
            String partitionRangeId = (String)entry.getKey();
            Set documentsToImportInPartition = (Set)entry.getValue();
            Iterator it = documentsToImportInPartition.iterator();
            ArrayList<String> currentMiniBatch = new ArrayList<String>(500);
            int currentMiniBatchSize = 0;
            while (it.hasNext()) {
                String currentDocument = (String)it.next();
                int currentDocumentSize = this.getDocumentSize(currentDocument);
                if (currentMiniBatchSize + currentDocumentSize <= this.maxMiniBatchSize) {
                    currentMiniBatch.add(currentDocument);
                    currentMiniBatchSize += currentDocumentSize;
                    continue;
                }
                ((List)miniBatchesToImportByPartition.get(partitionRangeId)).add(currentMiniBatch);
                currentMiniBatch = new ArrayList(500);
                currentMiniBatch.add(currentDocument);
                currentMiniBatchSize = currentDocumentSize;
            }
            if (currentMiniBatch.size() > 0) {
                ((List)miniBatchesToImportByPartition.get(partitionRangeId)).add(currentMiniBatch);
            }
        });
        this.logger.debug("Beginning bulk import within each partition bucket");
        final HashMap<String, BatchInserter> batchInserters = new HashMap<String, BatchInserter>();
        final HashMap<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
        this.logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            BatchInserter batchInserter = new BatchInserter(partitionKeyRangeId, (List)miniBatchesToImportByPartition.get(partitionKeyRangeId), this.client, this.bulkImportStoredProcLink, options);
            batchInserters.put(partitionKeyRangeId, batchInserter);
            CongestionController cc = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), partitionKeyRangeId, batchInserter, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId), maxConcurrencyPerPartitionRange);
            congestionControllers.put(partitionKeyRangeId, cc);
            futures.add(cc.executeAllAsync());
        }
        Futures.FutureCombiner futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkImportResponse> completeAsyncCallback = new AsyncCallable<BulkImportResponse>(){

            @Override
            public ListenableFuture<BulkImportResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                ArrayList<Object> badInputDocuments = new ArrayList<Object>();
                for (String partitionKeyRangeId : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController cc = (CongestionController)congestionControllers.get(partitionKeyRangeId);
                    failures.addAll(cc.getFailures());
                    BatchInserter batchInserter = (BatchInserter)batchInserters.get(partitionKeyRangeId);
                    badInputDocuments.addAll(batchInserter.getBadInputDocuments());
                    documentsFailedToImportDueToSplits.addAll(batchInserter.getDocumentsFailedToImportDueToSplits());
                    failedImports.addAll(((BatchInserter)batchInserters.get(partitionKeyRangeId)).getDocumentsFailedToImport());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(partitionKeyRangeId, cc.getDegreeOfConcurrency());
                }
                int numberOfDocumentsImported = batchInserters.values().stream().mapToInt(b -> b.getNumberOfDocumentsImported()).sum();
                double totalRequestUnitsConsumed = batchInserters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
                watch.stop();
                BulkImportResponse bulkImportResponse = new BulkImportResponse(numberOfDocumentsImported, totalRequestUnitsConsumed, watch.elapsed(), failures, badInputDocuments, failedImports);
                return Futures.immediateFuture(bulkImportResponse);
            }
        };
        return futureContainer.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private ListenableFuture<BulkUpdateResponse> executeBulkUpdateAsyncImpl(Collection<UpdateItem> updateItems, final Collection<UpdateItem> documentsFailedToUpdateDueToSplits, Integer maxConcurrencyPerPartitionRange) {
        final Stopwatch watch = Stopwatch.createStarted();
        this.logger.debug("Bucketing update items ...");
        ConcurrentHashMap updateItemsByPartition = new ConcurrentHashMap();
        ConcurrentHashMap miniBatchesToUpdateByPartition = new ConcurrentHashMap();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            updateItemsByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet(updateItems.size() / this.partitionKeyRangeIds.size()));
            miniBatchesToUpdateByPartition.put(partitionKeyRangeId, new ArrayList(1000));
        }
        updateItems.parallelStream().forEach(updateItem -> {
            PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.fromPartitionKeyvalue(updateItem.getPartitionKeyValue());
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
            String partitionRangeId = this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            ((Set)updateItemsByPartition.get(partitionRangeId)).add(updateItem);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        updateItemsByPartition.entrySet().parallelStream().forEach(entry -> {
            String partitionRangeId = (String)entry.getKey();
            Set updateItemsInPartition = (Set)entry.getValue();
            Iterator it = updateItemsInPartition.iterator();
            ArrayList<UpdateItem> currentMiniBatch = new ArrayList<UpdateItem>(500);
            int currentMiniBatchIndex = 0;
            while (it.hasNext()) {
                UpdateItem currentUpdateItem = (UpdateItem)it.next();
                if (currentMiniBatchIndex + 1 <= this.maxUpdateMiniBatchCount) {
                    currentMiniBatch.add(currentUpdateItem);
                    ++currentMiniBatchIndex;
                    continue;
                }
                ((List)miniBatchesToUpdateByPartition.get(partitionRangeId)).add(currentMiniBatch);
                currentMiniBatch = new ArrayList(500);
                currentMiniBatch.add(currentUpdateItem);
                currentMiniBatchIndex = 1;
            }
            if (currentMiniBatch.size() > 0) {
                ((List)miniBatchesToUpdateByPartition.get(partitionRangeId)).add(currentMiniBatch);
            }
        });
        this.logger.debug("Beginning bulk update within each partition bucket");
        final ArrayList failedUpdates = new ArrayList();
        final HashMap<String, BatchUpdater> batchUpdaters = new HashMap<String, BatchUpdater>();
        final HashMap<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
        this.logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        Collection partitionKeyPath = this.partitionKeyDefinition.getPaths();
        String partitionKeyProperty = ((String)partitionKeyPath.iterator().next()).replaceFirst("^/", "");
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            BatchUpdater batchUpdater = new BatchUpdater(partitionKeyRangeId, (List)miniBatchesToUpdateByPartition.get(partitionKeyRangeId), this.client, this.bulkUpdateStoredProcLink, partitionKeyProperty);
            batchUpdaters.put(partitionKeyRangeId, batchUpdater);
            CongestionController cc = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), partitionKeyRangeId, batchUpdater, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId), maxConcurrencyPerPartitionRange);
            congestionControllers.put(partitionKeyRangeId, cc);
            futures.add(cc.executeAllAsync());
        }
        Futures.FutureCombiner futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkUpdateResponse> completeAsyncCallback = new AsyncCallable<BulkUpdateResponse>(){

            @Override
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                for (String partitionKeyRangeId : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController cc = (CongestionController)congestionControllers.get(partitionKeyRangeId);
                    failures.addAll(cc.getFailures());
                    BatchUpdater batchUpdater = (BatchUpdater)batchUpdaters.get(partitionKeyRangeId);
                    documentsFailedToUpdateDueToSplits.addAll(batchUpdater.getDocumentsFailedToUpdateDueToSplits());
                    failedUpdates.addAll(batchUpdater.getBulkUpdateFailures());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(partitionKeyRangeId, cc.getDegreeOfConcurrency());
                }
                int numberOfDocumentsUpdated = batchUpdaters.values().stream().mapToInt(b -> b.getNumberOfDocumentsUpdated()).sum();
                double totalRequestUnitsConsumed = batchUpdaters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
                watch.stop();
                BulkUpdateResponse bulkUpdateResponse = new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, watch.elapsed(), failures, failedUpdates);
                return Futures.immediateFuture(bulkUpdateResponse);
            }
        };
        return futureContainer.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private ListenableFuture<BulkUpdateResponse> executeBulkUpdateWithPatchAsyncImpl(Collection<Document> patchDocuments, final Collection<UpdateItem> documentsFailedToUpdateDueToSplits, Integer maxConcurrencyPerPartitionRange) {
        final Stopwatch watch = Stopwatch.createStarted();
        this.logger.debug("Bucketing patch documents ...");
        ConcurrentHashMap updateItemsByPartition = new ConcurrentHashMap();
        ConcurrentHashMap miniBatchesToUpdateByPartition = new ConcurrentHashMap();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            updateItemsByPartition.put(partitionKeyRangeId, ConcurrentHashMap.newKeySet(patchDocuments.size() / this.partitionKeyRangeIds.size()));
            miniBatchesToUpdateByPartition.put(partitionKeyRangeId, new ArrayList(1000));
        }
        Collection partitionKeyPath = this.partitionKeyDefinition.getPaths();
        String partitionKeyProperty = ((String)partitionKeyPath.iterator().next()).replaceFirst("^/", "");
        patchDocuments.parallelStream().forEach(patchDocument -> {
            UpdateItem updateItem = this.getUpdateItemFromPatchDocument((Document)patchDocument, partitionKeyProperty);
            PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.fromPartitionKeyvalue(updateItem.getPartitionKeyValue());
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
            String partitionRangeId = this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            ((Set)updateItemsByPartition.get(partitionRangeId)).add(updateItem);
        });
        this.logger.debug("Creating mini batches within each partition bucket");
        updateItemsByPartition.entrySet().parallelStream().forEach(entry -> {
            String partitionRangeId = (String)entry.getKey();
            Set updateItemsInPartition = (Set)entry.getValue();
            Iterator it = updateItemsInPartition.iterator();
            ArrayList<UpdateItem> currentMiniBatch = new ArrayList<UpdateItem>(500);
            int currentMiniBatchIndex = 0;
            while (it.hasNext()) {
                UpdateItem currentUpdateItem = (UpdateItem)it.next();
                if (currentMiniBatchIndex + 1 <= this.maxUpdateMiniBatchCount) {
                    currentMiniBatch.add(currentUpdateItem);
                    ++currentMiniBatchIndex;
                    continue;
                }
                ((List)miniBatchesToUpdateByPartition.get(partitionRangeId)).add(currentMiniBatch);
                currentMiniBatch = new ArrayList(500);
                currentMiniBatch.add(currentUpdateItem);
                currentMiniBatchIndex = 1;
            }
            if (currentMiniBatch.size() > 0) {
                ((List)miniBatchesToUpdateByPartition.get(partitionRangeId)).add(currentMiniBatch);
            }
        });
        this.logger.debug("Beginning bulk update within each partition bucket");
        final ArrayList failedUpdates = new ArrayList();
        final HashMap<String, BatchUpdater> batchUpdaters = new HashMap<String, BatchUpdater>();
        final HashMap<String, CongestionController> congestionControllers = new HashMap<String, CongestionController>();
        this.logger.debug("Preprocessing took: " + watch.elapsed().toMillis() + " millis");
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            BatchUpdater batchUpdater = new BatchUpdater(partitionKeyRangeId, (List)miniBatchesToUpdateByPartition.get(partitionKeyRangeId), this.client, this.bulkUpdateStoredProcLink, partitionKeyProperty);
            batchUpdaters.put(partitionKeyRangeId, batchUpdater);
            CongestionController cc = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), partitionKeyRangeId, batchUpdater, this.partitionKeyRangeIdToInferredDegreeOfParallelism.get(partitionKeyRangeId), maxConcurrencyPerPartitionRange);
            congestionControllers.put(partitionKeyRangeId, cc);
            futures.add(cc.executeAllAsync());
        }
        Futures.FutureCombiner futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkUpdateResponse> completeAsyncCallback = new AsyncCallable<BulkUpdateResponse>(){

            @Override
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                for (String partitionKeyRangeId : DocumentBulkExecutor.this.partitionKeyRangeIds) {
                    CongestionController cc = (CongestionController)congestionControllers.get(partitionKeyRangeId);
                    BatchUpdater batchUpdater = (BatchUpdater)batchUpdaters.get(partitionKeyRangeId);
                    documentsFailedToUpdateDueToSplits.addAll(batchUpdater.getDocumentsFailedToUpdateDueToSplits());
                    failedUpdates.addAll(batchUpdater.getBulkUpdateFailures());
                    failures.addAll(cc.getFailures());
                    DocumentBulkExecutor.this.partitionKeyRangeIdToInferredDegreeOfParallelism.put(partitionKeyRangeId, cc.getDegreeOfConcurrency());
                }
                int numberOfDocumentsUpdated = batchUpdaters.values().stream().mapToInt(b -> b.getNumberOfDocumentsUpdated()).sum();
                double totalRequestUnitsConsumed = batchUpdaters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
                watch.stop();
                BulkUpdateResponse bulkUpdateResponse = new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, watch.elapsed(), failures, failedUpdates);
                return Futures.immediateFuture(bulkUpdateResponse);
            }
        };
        return futureContainer.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private UpdateItem getUpdateItemFromPatchDocument(Document patchDocument, String partitionKeyProperty) {
        String idValue = null;
        String pkValue = null;
        ArrayList<UpdateOperationBase> updateOperations = new ArrayList<UpdateOperationBase>();
        HashMap patchDocumentMap = patchDocument.getHashMap();
        for (Map.Entry entry : patchDocumentMap.entrySet()) {
            if (((String)entry.getKey()).matches("id")) {
                idValue = (String)entry.getValue();
                continue;
            }
            if (((String)entry.getKey()).matches(partitionKeyProperty)) {
                pkValue = (String)entry.getValue();
                continue;
            }
            updateOperations.addAll(this.getUpdateOperations("", (String)entry.getKey(), entry.getValue()));
        }
        return new UpdateItem(idValue, pkValue, updateOperations);
    }

    private List<UpdateOperationBase> getUpdateOperations(String propertyKeyPrefix, String propertyKey, Object propertyValue) {
        String propertyKeyToSet;
        ArrayList<UpdateOperationBase> updateOperations = new ArrayList<UpdateOperationBase>();
        String string = propertyKeyToSet = propertyKeyPrefix.matches("") ? propertyKey : propertyKeyPrefix + "." + propertyKey;
        if (propertyValue instanceof String) {
            updateOperations.add(new SetUpdateOperation<String>(propertyKeyToSet, (String)propertyValue));
        } else if (propertyValue instanceof Integer) {
            updateOperations.add(new SetUpdateOperation<Integer>(propertyKeyToSet, (Integer)propertyValue));
        } else if (propertyValue instanceof Double) {
            updateOperations.add(new SetUpdateOperation<Double>(propertyKeyToSet, (Double)propertyValue));
        } else if (propertyValue instanceof Boolean) {
            updateOperations.add(new SetUpdateOperation<Boolean>(propertyKeyToSet, (Boolean)propertyValue));
        } else if (propertyValue instanceof List) {
            updateOperations.add(new SetUpdateOperation<List>(propertyKeyToSet, (List)propertyValue));
        } else if (propertyValue instanceof Map) {
            HashMap propertyHashMap = (HashMap)propertyValue;
            for (Map.Entry entry : propertyHashMap.entrySet()) {
                updateOperations.addAll(this.getUpdateOperations(propertyKeyToSet, (String)entry.getKey(), entry.getValue()));
            }
        }
        return updateOperations;
    }

    private ListenableFuture<BulkUpdateResponse> executeUpdateDocumentAsyncImpl(String partitionKey, String id, List<UpdateOperationBase> updateOperations) {
        final Stopwatch watch = Stopwatch.createStarted();
        PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.fromPartitionKeyvalue(partitionKey);
        String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
        String partitionRangeId = this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
        ArrayList<List<UpdateItem>> miniBatchesToUpdate = new ArrayList<List<UpdateItem>>(1);
        ArrayList<UpdateItem> currentMiniBatch = new ArrayList<UpdateItem>(1);
        UpdateItem currentItem = new UpdateItem(id, partitionKey, updateOperations);
        currentMiniBatch.add(currentItem);
        miniBatchesToUpdate.add(currentMiniBatch);
        Collection partitionKeyPath = this.partitionKeyDefinition.getPaths();
        String partitionKeyProperty = ((String)partitionKeyPath.iterator().next()).replaceFirst("^/", "");
        final BatchUpdater batchUpdater = new BatchUpdater(partitionRangeId, miniBatchesToUpdate, this.client, this.bulkUpdateStoredProcLink, partitionKeyProperty);
        final CongestionController cc = new CongestionController(this.listeningExecutorService, this.collectionThroughput / this.partitionKeyRangeIds.size(), partitionRangeId, batchUpdater, null, null);
        ArrayList<ListenableFuture<Void>> futures = new ArrayList<ListenableFuture<Void>>();
        futures.add(cc.executeAllAsync());
        Futures.FutureCombiner futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkUpdateResponse> completeAsyncCallback = new AsyncCallable<BulkUpdateResponse>(){

            @Override
            public ListenableFuture<BulkUpdateResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                failures.addAll(cc.getFailures());
                int numberOfDocumentsUpdated = batchUpdater.getNumberOfDocumentsUpdated();
                double totalRequestUnitsConsumed = batchUpdater.getTotalRequestUnitsConsumed();
                watch.stop();
                BulkUpdateResponse bulkUpdateResponse = new BulkUpdateResponse(numberOfDocumentsUpdated, totalRequestUnitsConsumed, watch.elapsed(), failures, null);
                return Futures.immediateFuture(bulkUpdateResponse);
            }
        };
        return futureContainer.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private ListenableFuture<BulkDeleteResponse> executeBulkDeleteAsyncImpl(String query, RequestOptions requestOptions) throws Exception {
        List<String> partitionKeyRangeIds = this.partitionKeyRangeIds;
        if (requestOptions != null && requestOptions.getPartitionKey() != null) {
            partitionKeyRangeIds = new ArrayList<String>();
            PartitionKeyInternal partitionKeyValue = requestOptions.getPartitionKey().getInternalPartitionKey();
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
            partitionKeyRangeIds.add(this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId());
        }
        ArrayList<Future> futures = new ArrayList<Future>();
        final Stopwatch watch = Stopwatch.createStarted();
        Matcher bulkDeleteQuerySpecMatcher = BULK_DELETE_QUERY_SPEC_PATTERN.matcher(query);
        if (!bulkDeleteQuerySpecMatcher.find()) {
            throw new IllegalArgumentException("Input SQL query is invalid: " + query + " Query must be of the form: select * from c where <filter conditions>");
        }
        String root = bulkDeleteQuerySpecMatcher.group("root").toString();
        if (bulkDeleteQuerySpecMatcher.group("filter") == null || bulkDeleteQuerySpecMatcher.group("filter").isEmpty() || bulkDeleteQuerySpecMatcher.group("filter").trim().isEmpty()) {
            throw new IllegalArgumentException("Input SQL query is invalid: " + query + " Query must have filters in its where clause");
        }
        String filterExpression = bulkDeleteQuerySpecMatcher.group("filter").toString();
        BulkDeleteQuerySpec querySpec = new BulkDeleteQuerySpec(root, filterExpression, null, 1000);
        this.logger.debug("Beginning bulk delete within each partition range");
        final HashMap<String, BatchDeleter> batchDeleters = new HashMap<String, BatchDeleter>();
        for (String partitionKeyRangeId : partitionKeyRangeIds) {
            BatchDeleter batchDeleter = new BatchDeleter(partitionKeyRangeId, this.client, this.bulkDeleteStoredProcLink, querySpec);
            batchDeleters.put(partitionKeyRangeId, batchDeleter);
            Future batchDeleterFuture = this.listeningExecutorService.submit(batchDeleter.executeDelete());
            futures.add(batchDeleterFuture);
        }
        Futures.FutureCombiner futureContainer = Futures.whenAllComplete(futures);
        AsyncCallable<BulkDeleteResponse> completeAsyncCallback = new AsyncCallable<BulkDeleteResponse>(){

            @Override
            public ListenableFuture<BulkDeleteResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                int numberOfDocumentsDeleted = batchDeleters.values().stream().mapToInt(b -> b.getNumberOfDocumentsDeleted()).sum();
                double totalRequestUnitsConsumed = batchDeleters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
                watch.stop();
                BulkDeleteResponse bulkDeleteResponse = new BulkDeleteResponse(numberOfDocumentsDeleted, totalRequestUnitsConsumed, watch.elapsed(), failures);
                return Futures.immediateFuture(bulkDeleteResponse);
            }
        };
        return futureContainer.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private ListenableFuture<BulkDeleteResponse> executeBulkDeletePkRowKeyPairsAsyncImpl(List<Pair<String, String>> pkIdPairsToDelete) throws Exception {
        ArrayList<Future> futures = new ArrayList<Future>();
        final Stopwatch watch = Stopwatch.createStarted();
        this.logger.debug("Bucketing documents ...");
        ConcurrentHashMap documentsToDeleteByPartition = new ConcurrentHashMap();
        ConcurrentHashMap miniBatchesToDeleteByPartition = new ConcurrentHashMap();
        for (String string : this.partitionKeyRangeIds) {
            documentsToDeleteByPartition.put(string, ConcurrentHashMap.newKeySet(pkIdPairsToDelete.size() / this.partitionKeyRangeIds.size()));
            miniBatchesToDeleteByPartition.put(string, new ArrayList(1000));
        }
        pkIdPairsToDelete.parallelStream().forEach(documentAsString -> {
            PartitionKeyInternal partitionKeyValue = DocumentAnalyzer.fromPartitionKeyvalue(documentAsString.getKey());
            String effectivePartitionKey = partitionKeyValue.getEffectivePartitionKeyString(this.partitionKeyDefinition, true);
            String partitionRangeId = this.collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            ((Set)documentsToDeleteByPartition.get(partitionRangeId)).add(documentAsString);
        });
        this.logger.trace("Creating mini batches within each partition bucket");
        documentsToDeleteByPartition.entrySet().parallelStream().forEach(entry -> {
            String partitionRangeId = (String)entry.getKey();
            Set documentsToDeleteInPartition = (Set)entry.getValue();
            Iterator it = documentsToDeleteInPartition.iterator();
            ArrayList<Pair> currentMiniBatch = new ArrayList<Pair>(100);
            int currentMiniBatchSize = 0;
            while (it.hasNext()) {
                Pair currentDocument = (Pair)it.next();
                if (currentMiniBatchSize <= this.maxMiniBatchSize) {
                    currentMiniBatch.add(currentDocument);
                    ++currentMiniBatchSize;
                    continue;
                }
                ((List)miniBatchesToDeleteByPartition.get(partitionRangeId)).add(currentMiniBatch);
                currentMiniBatch = new ArrayList(100);
                currentMiniBatch.add(currentDocument);
                currentMiniBatchSize = 1;
            }
            if (currentMiniBatch.size() > 0) {
                ((List)miniBatchesToDeleteByPartition.get(partitionRangeId)).add(currentMiniBatch);
            }
        });
        this.logger.debug("Beginning bulk delete within each partition range");
        final HashMap<String, BatchDeleter> batchDeleters = new HashMap<String, BatchDeleter>();
        for (String partitionKeyRangeId : this.partitionKeyRangeIds) {
            for (List eachMiniBatchForPkRangeId : (List)miniBatchesToDeleteByPartition.get(partitionKeyRangeId)) {
                BulkDeleteQuerySpec querySpec = this.getBulkDeleteQuerySpecForMiniBatch(eachMiniBatchForPkRangeId);
                BatchDeleter batchDeleter = new BatchDeleter(partitionKeyRangeId, this.client, this.bulkDeleteStoredProcLink, querySpec);
                batchDeleters.put(partitionKeyRangeId, batchDeleter);
                Future batchDeleterFuture = this.listeningExecutorService.submit(batchDeleter.executeDelete());
                futures.add(batchDeleterFuture);
            }
        }
        Futures.FutureCombiner futureCombiner = Futures.whenAllComplete(futures);
        AsyncCallable<BulkDeleteResponse> completeAsyncCallback = new AsyncCallable<BulkDeleteResponse>(){

            @Override
            public ListenableFuture<BulkDeleteResponse> call() throws Exception {
                ArrayList<Exception> failures = new ArrayList<Exception>();
                int numberOfDocumentsDeleted = batchDeleters.values().stream().mapToInt(b -> b.getNumberOfDocumentsDeleted()).sum();
                double totalRequestUnitsConsumed = batchDeleters.values().stream().mapToDouble(b -> b.getTotalRequestUnitsConsumed()).sum();
                watch.stop();
                BulkDeleteResponse bulkDeleteResponse = new BulkDeleteResponse(numberOfDocumentsDeleted, totalRequestUnitsConsumed, watch.elapsed(), failures);
                return Futures.immediateFuture(bulkDeleteResponse);
            }
        };
        return futureCombiner.callAsync(completeAsyncCallback, this.listeningExecutorService);
    }

    private BulkDeleteQuerySpec getBulkDeleteQuerySpecForMiniBatch(List<Pair<String, String>> pkIdPairsToDelete) {
        String partitionKeyField = ((String)this.partitionKeyDefinition.getPaths().iterator().next()).replaceFirst("/", "");
        StringBuilder bulkDeleteQuerySpecBuilder = new StringBuilder();
        bulkDeleteQuerySpecBuilder.append("select * from c where (c.");
        bulkDeleteQuerySpecBuilder.append(partitionKeyField).append(" = \"").append((String)pkIdPairsToDelete.get(0).getKey()).append("\"");
        bulkDeleteQuerySpecBuilder.append(" and c.id = \"").append((String)pkIdPairsToDelete.get(0).getValue()).append("\")");
        for (int eachPkIdPairToDeleteIndex = 1; eachPkIdPairToDeleteIndex < pkIdPairsToDelete.size(); ++eachPkIdPairToDeleteIndex) {
            Pair<String, String> eachPkIdPairToDelete = pkIdPairsToDelete.get(eachPkIdPairToDeleteIndex);
            String partitionKey = (String)eachPkIdPairToDelete.getKey();
            String id = (String)eachPkIdPairToDelete.getValue();
            bulkDeleteQuerySpecBuilder.append("or (c.");
            bulkDeleteQuerySpecBuilder.append(partitionKeyField).append(" = \"").append(partitionKey).append("\"");
            bulkDeleteQuerySpecBuilder.append(" and c.id = \"").append(id).append("\")");
        }
        Matcher bulkDeleteQuerySpecMatcher = BULK_DELETE_QUERY_SPEC_PATTERN.matcher(bulkDeleteQuerySpecBuilder.toString());
        bulkDeleteQuerySpecMatcher.find();
        String root = bulkDeleteQuerySpecMatcher.group("root").toString();
        String filterExpression = bulkDeleteQuerySpecMatcher.group("filter").toString();
        BulkDeleteQuerySpec querySpec = new BulkDeleteQuerySpec(root, filterExpression, null, 1000);
        return querySpec;
    }

    private static CollectionRoutingMap getCollectionRoutingMap(DocumentClient client, String collectionLink) {
        ArrayList<ImmutablePair> ranges = new ArrayList<ImmutablePair>();
        for (PartitionKeyRange range : client.readPartitionKeyRanges(collectionLink, (FeedOptions)null).getQueryIterable().toList()) {
            ranges.add(new ImmutablePair((Object)range, (Object)true));
        }
        InMemoryCollectionRoutingMap routingMap = InMemoryCollectionRoutingMap.tryCreateCompleteRoutingMap(ranges, (String)"");
        if (routingMap == null) {
            throw new IllegalStateException("Cannot create complete routing map");
        }
        return routingMap;
    }

    private static List<ImmutablePair<String, String>> getPartitionKeyRangeIdsFromValues(CollectionRoutingMap collectionRoutingMap, PartitionKeyDefinition partitionKeyDefinition, List<String> partitionKeyValues) {
        ArrayList<ImmutablePair<String, String>> partitionRangeIds = new ArrayList<ImmutablePair<String, String>>();
        for (String partitionKeyValue : partitionKeyValues) {
            PartitionKeyInternal pkInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), (boolean)true);
            String effectivePartitionKey = pkInternal.getEffectivePartitionKeyString(partitionKeyDefinition, true);
            String partitionRangeId = collectionRoutingMap.getRangeByEffectivePartitionKey(effectivePartitionKey).getId();
            partitionRangeIds.add((ImmutablePair<String, String>)new ImmutablePair((Object)partitionKeyValue, (Object)partitionRangeId));
        }
        return partitionRangeIds;
    }

    private DocumentClientException toDocumentClientException(Exception e) {
        if (e instanceof DocumentClientException) {
            return (DocumentClientException)e;
        }
        return new DocumentClientException(500, e);
    }

    private int getDocumentSize(String document) {
        int documentSize = document.getBytes(Charset.forName("UTF-8")).length;
        if (documentSize > this.maxMiniBatchSize) {
            this.logger.error("Document size {} larger than script payload limit. {}", (Object)documentSize, (Object)this.maxMiniBatchSize);
        }
        return documentSize;
    }

    public static class Builder {
        private DocumentClient client;
        private String collectionLink;
        private int maxMiniBatchSize = (int)Math.floor(220201.0);
        private int maxUpdateMiniBatchCount = 500;
        private static final int DEFAULT_RETRY_ATTEMPT_ON_THROTTLING_FOR_INIT = 200;
        private static final int DEFAULT_WAIT_TIME_ON_THROTTLING_FOR_INIT_IN_SECONDS = 60;
        private PartitionKeyDefinition partitionKeyDef;
        private int offerThroughput;
        private static RetryOptions DEFAULT_INIT_RETRY_OPTIONS = new RetryOptions();
        private RetryOptions retryOptions = DEFAULT_INIT_RETRY_OPTIONS;

        public Builder from(DocumentClient client, String databaseName, String collectionName, PartitionKeyDefinition partitionKeyDef, int offerThroughput) {
            this.client = client;
            this.collectionLink = String.format("/dbs/%s/colls/%s", databaseName, collectionName);
            this.partitionKeyDef = partitionKeyDef;
            this.offerThroughput = offerThroughput;
            return this;
        }

        public Builder withMaxMiniBatchSize(int size) {
            Preconditions.checkArgument(size > 0, "maxMiniBatchSize cannot be negative");
            Preconditions.checkArgument(size <= 1101005, "maxMiniBatchSize cannot be negative");
            this.maxMiniBatchSize = size;
            return this;
        }

        public Builder withMaxUpdateMiniBatchCount(int count) {
            Preconditions.checkArgument(count > 0, "maxUpdateMiniBatchCount cannot be negative");
            this.maxUpdateMiniBatchCount = count;
            return this;
        }

        public Builder withInitializationRetryOptions(RetryOptions options) {
            this.retryOptions = options;
            return this;
        }

        public DocumentBulkExecutor build() throws Exception {
            DocumentBulkExecutor executor = new DocumentBulkExecutor(this.client, this.collectionLink, this.partitionKeyDef, this.offerThroughput);
            try {
                executor.setInitializationRetryOptions(this.retryOptions);
                executor.setMaxMiniBatchSize(this.maxMiniBatchSize);
                executor.setMaxUpdateMiniBatchCount(this.maxUpdateMiniBatchCount);
                executor.safeInit();
            }
            catch (Exception e) {
                executor.close();
                throw e;
            }
            return executor;
        }

        private Builder() {
        }

        static {
            DEFAULT_INIT_RETRY_OPTIONS.setMaxRetryAttemptsOnThrottledRequests(200);
            DEFAULT_INIT_RETRY_OPTIONS.setMaxRetryWaitTimeInSeconds(60);
        }
    }
}

