/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.documentdb.bulkexecutor.internal;

import com.microsoft.azure.documentdb.bulkexecutor.internal.BatchOperator;
import com.microsoft.azure.documentdb.bulkexecutor.internal.ExceptionUtils;
import com.microsoft.azure.documentdb.bulkexecutor.internal.OperationMetrics;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.AsyncFunction;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.FutureCallback;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.Futures;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import com.microsoft.azure.documentdb.repackaged.com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CongestionController {
    private final Logger logger = LoggerFactory.getLogger(CongestionController.class);
    private static final int STARTING_DEGREE_OF_CONCURRENCY = 1;
    private static final int MAX_DEGREE_OF_CONCURRENCY = 20;
    private static final int ADDITIVE_INCREASE_FACTOR = 1;
    private static final int DIVISIVE_DECREASE_FACTOR = 2;
    private static final double THROUGHPUT_THRESHOLD = 0.9;
    private final String partitionKeyRangeId;
    private final Duration samplePeriod = Duration.ofSeconds(1L);
    private final BatchOperator batchOperator;
    private final Semaphore throttleSemaphore;
    private OperationMetrics aggregatedInsertMetrics;
    private Object aggregateLock = new Object();
    private long documentsOperatedSoFar;
    private volatile State state = State.Running;
    private int degreeOfConcurrency;
    private int maxDegreeOfConcurrency;
    private ListeningExecutorService executor;
    private int partitionThroughput;
    private final List<Exception> failures = Collections.synchronizedList(new ArrayList());

    public CongestionController(ListeningExecutorService executor, int partitionThroughput, String partitionKeyRangeId, BatchOperator batchOperator) {
        this(executor, partitionThroughput, partitionKeyRangeId, batchOperator, null, null);
    }

    public CongestionController(ListeningExecutorService executor, int partitionThroughput, String partitionKeyRangeId, BatchOperator batchOperator, Integer startingDegreeOfConcurrency, Integer maxDegreeOfConcurrency) {
        this.partitionKeyRangeId = partitionKeyRangeId;
        this.batchOperator = batchOperator;
        this.degreeOfConcurrency = startingDegreeOfConcurrency != null ? startingDegreeOfConcurrency : 1;
        this.maxDegreeOfConcurrency = maxDegreeOfConcurrency != null ? maxDegreeOfConcurrency : 20;
        this.throttleSemaphore = new Semaphore(this.degreeOfConcurrency);
        this.aggregatedInsertMetrics = new OperationMetrics();
        this.executor = executor;
        this.partitionThroughput = partitionThroughput;
    }

    private void addFailure(Exception e) {
        this.failures.add(e);
    }

    public List<Exception> getFailures() {
        return Collections.unmodifiableList(this.failures);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OperationMetrics atomicGetAndReplace(OperationMetrics metrics) {
        Object object = this.aggregateLock;
        synchronized (object) {
            OperationMetrics old = this.aggregatedInsertMetrics;
            this.aggregatedInsertMetrics = metrics;
            return old;
        }
    }

    private Callable<Void> congestionControlTask() {
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                while (CongestionController.this.isRunning()) {
                    try {
                        CongestionController.this.logger.debug("pki {} goes to sleep for {} seconds. available semaphore permits {}, current degree of parallelism {}", new Object[]{CongestionController.this.partitionKeyRangeId, CongestionController.this.samplePeriod.getSeconds(), CongestionController.this.throttleSemaphore.availablePermits(), CongestionController.this.degreeOfConcurrency});
                        Thread.sleep(CongestionController.this.samplePeriod.toMillis());
                        CongestionController.this.logger.debug("pki {} wakes up", (Object)CongestionController.this.partitionKeyRangeId);
                        OperationMetrics insertMetricsSample = CongestionController.this.atomicGetAndReplace(new OperationMetrics());
                        if (insertMetricsSample.numberOfThrottles > 0L) {
                            CongestionController.this.logger.debug("pki {} importing encountered {} throttling. current degree of parallelism {}, decreasing amount: {}", new Object[]{CongestionController.this.partitionKeyRangeId, insertMetricsSample.numberOfThrottles, CongestionController.this.degreeOfConcurrency, CongestionController.this.degreeOfConcurrency / 2});
                            for (int i = 0; i < CongestionController.this.degreeOfConcurrency / 2; ++i) {
                                CongestionController.this.throttleSemaphore.acquire();
                            }
                            CongestionController.this.degreeOfConcurrency -= CongestionController.this.degreeOfConcurrency / 2;
                            CongestionController.this.logger.debug("pki {} degree of parallelism reduced to {}, sem available permits", new Object[]{CongestionController.this.partitionKeyRangeId, CongestionController.this.degreeOfConcurrency, CongestionController.this.throttleSemaphore.availablePermits()});
                        }
                        if (insertMetricsSample.numberOfDocumentsInserted == 0L) continue;
                        CongestionController.this.logger.debug("pki {} aggregating inserts metrics", (Object)CongestionController.this.partitionKeyRangeId);
                        if (insertMetricsSample.numberOfThrottles == 0L && insertMetricsSample.requestUnitsConsumed < 0.9 * (double)CongestionController.this.partitionThroughput && CongestionController.this.degreeOfConcurrency + 1 <= CongestionController.this.maxDegreeOfConcurrency) {
                            CongestionController.this.logger.debug("pki {} increasing degree of prallelism and releasing semaphore", (Object)CongestionController.this.partitionKeyRangeId);
                            CongestionController.this.throttleSemaphore.release(1);
                            CongestionController.this.degreeOfConcurrency += 1;
                            CongestionController.this.logger.debug("pki {} degree of parallelism increased to {}. available semaphore permits {}", new Object[]{CongestionController.this.partitionKeyRangeId, CongestionController.this.degreeOfConcurrency, CongestionController.this.throttleSemaphore.availablePermits()});
                        }
                        double ruPerSecond = insertMetricsSample.requestUnitsConsumed / (double)CongestionController.this.samplePeriod.getSeconds();
                        CongestionController.this.documentsOperatedSoFar += insertMetricsSample.numberOfDocumentsInserted;
                        CongestionController.this.logger.debug("pki {} : Operated on {} docs in {} milli seconds at {} RU/s with {} tasks. Faced {} throttles. Total documents operated on so far {}.", new Object[]{CongestionController.this.partitionKeyRangeId, insertMetricsSample.numberOfDocumentsInserted, CongestionController.this.samplePeriod.toMillis(), ruPerSecond, CongestionController.this.degreeOfConcurrency, insertMetricsSample.numberOfThrottles, CongestionController.this.documentsOperatedSoFar});
                    }
                    catch (InterruptedException e) {
                        CongestionController.this.logger.warn("Interrupted", (Throwable)e);
                        break;
                    }
                    catch (Exception e) {
                        CongestionController.this.logger.error("pki {} unexpected failure", (Object)CongestionController.this.partitionKeyRangeId, (Object)e);
                        throw e;
                    }
                }
                return null;
            }
        };
    }

    public ListenableFuture<Void> executeAllAsync() {
        Callable<ListenableFuture<Void>> c = new Callable<ListenableFuture<Void>>(){

            @Override
            public ListenableFuture<Void> call() throws Exception {
                return CongestionController.this.executeAll();
            }
        };
        Future f = this.executor.submit(c);
        AsyncFunction<ListenableFuture<Void>, Void> function = new AsyncFunction<ListenableFuture<Void>, Void>(){

            @Override
            public ListenableFuture<Void> apply(ListenableFuture<Void> input) throws Exception {
                return input;
            }
        };
        return Futures.transformAsync(f, function, this.executor);
    }

    public ListenableFuture<Void> executeAll() {
        this.logger.debug("pki{} Executing batching", (Object)this.partitionKeyRangeId);
        Future completionFuture = this.executor.submit(this.congestionControlTask());
        Iterator<Callable<OperationMetrics>> batchExecutionIterator = this.batchOperator.miniBatchExecutionCallableIterator();
        ArrayList<Future> futureList = new ArrayList<Future>();
        while (batchExecutionIterator.hasNext() && this.isRunning()) {
            Callable<OperationMetrics> task = batchExecutionIterator.next();
            try {
                this.logger.debug("pki {} trying to acquire semaphore to execute a task. available permits {}", (Object)this.partitionKeyRangeId, (Object)this.throttleSemaphore.availablePermits());
                this.throttleSemaphore.acquire();
                this.logger.debug("pki {} acquiring semaphore for executing a task succeeded. available permits {}", (Object)this.partitionKeyRangeId, (Object)this.throttleSemaphore.availablePermits());
            }
            catch (InterruptedException e) {
                this.logger.error("pki {} Interrupted, releasing semaphore", (Object)this.partitionKeyRangeId, (Object)e);
                this.throttleSemaphore.release();
                throw new RuntimeException(e);
            }
            if (this.failed()) {
                this.logger.error("pki {} already failed due to earlier failures. not submitting new tasks", (Object)this.partitionKeyRangeId);
                this.throttleSemaphore.release();
                break;
            }
            Future insertMetricsFuture = this.executor.submit(task);
            FutureCallback<OperationMetrics> aggregateMetricsReleaseSemaphoreCallback = new FutureCallback<OperationMetrics>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(OperationMetrics result) {
                    CongestionController.this.logger.debug("pki {} accquiring a synchronized lock to update metrics", (Object)CongestionController.this.partitionKeyRangeId);
                    Object object = CongestionController.this.aggregateLock;
                    synchronized (object) {
                        CongestionController.this.aggregatedInsertMetrics = OperationMetrics.sum(CongestionController.this.aggregatedInsertMetrics, result);
                    }
                    CongestionController.this.logger.debug("pki {} releasing semaphore on completion of task", (Object)CongestionController.this.partitionKeyRangeId);
                    CongestionController.this.throttleSemaphore.release();
                }

                @Override
                public void onFailure(Throwable t) {
                    CongestionController.this.logger.error("pki {} encountered failure {} releasing semaphore", (Object)CongestionController.this.partitionKeyRangeId, (Object)t);
                    CongestionController.this.setState(State.Failure);
                    CongestionController.this.addFailure(ExceptionUtils.toException(t));
                    CongestionController.this.throttleSemaphore.release();
                }
            };
            Futures.addCallback(insertMetricsFuture, aggregateMetricsReleaseSemaphoreCallback, MoreExecutors.directExecutor());
            futureList.add(insertMetricsFuture);
        }
        ListenableFuture allFutureResults = Futures.allAsList(futureList);
        FutureCallback<List<OperationMetrics>> completionCallback = new FutureCallback<List<OperationMetrics>>(){

            @Override
            public void onSuccess(List<OperationMetrics> result) {
                CongestionController.this.logger.debug("pki {} importing completed", (Object)CongestionController.this.partitionKeyRangeId);
                CongestionController.this.setState(State.Completed);
            }

            @Override
            public void onFailure(Throwable t) {
                CongestionController.this.logger.error("pki {} importing failed", (Object)CongestionController.this.partitionKeyRangeId, (Object)t);
                CongestionController.this.setState(State.Failure);
            }
        };
        Futures.addCallback(allFutureResults, completionCallback, MoreExecutors.directExecutor());
        return completionFuture;
    }

    public void setState(State state) {
        this.logger.debug("pki {} state set to {}", (Object)this.partitionKeyRangeId, (Object)state);
        this.state = state;
    }

    public boolean isRunning() {
        this.logger.trace("pki {} in isRunning", (Object)this.partitionKeyRangeId);
        return this.state == State.Running;
    }

    public boolean completed() {
        return this.state == State.Completed;
    }

    public boolean failed() {
        return this.state == State.Failure;
    }

    public int getDegreeOfConcurrency() {
        return this.degreeOfConcurrency;
    }

    static enum State {
        Running,
        Completed,
        Failure;

    }
}

