/*
 * Decompiled with CFR 0.152.
 */
package com.azure.cosmos.implementation.throughputControl;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

public class ThroughputRequestThrottler {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputRequestThrottler.class);
    private final AtomicReference<Double> availableThroughput;
    private final AtomicReference<Double> scheduledThroughput;
    private final ReentrantReadWriteLock.WriteLock throughputWriteLock;
    private final ReentrantReadWriteLock.ReadLock throughputReadLock;

    public ThroughputRequestThrottler(double scheduledThroughput) {
        this.availableThroughput = new AtomicReference<Double>(scheduledThroughput);
        this.scheduledThroughput = new AtomicReference<Double>(scheduledThroughput);
        ReentrantReadWriteLock throughputReadWriteLock = new ReentrantReadWriteLock();
        this.throughputWriteLock = throughputReadWriteLock.writeLock();
        this.throughputReadLock = throughputReadWriteLock.readLock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public double renewThroughputUsageCycle(double scheduledThroughput) {
        try {
            this.throughputWriteLock.lock();
            double throughputUsagePercentage = (this.scheduledThroughput.get() - this.availableThroughput.get()) / this.scheduledThroughput.get();
            this.scheduledThroughput.set(scheduledThroughput);
            this.updateAvailableThroughput();
            double d = throughputUsagePercentage;
            return d;
        }
        finally {
            this.throughputWriteLock.unlock();
        }
    }

    private void updateAvailableThroughput() {
        this.availableThroughput.getAndAccumulate(this.scheduledThroughput.get(), (available, refill) -> Math.min(available, 0.0) + refill);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> Mono<T> processRequest(RxDocumentServiceRequest request, Mono<T> originalRequestMono) {
        try {
            this.throughputReadLock.lock();
            if (this.availableThroughput.get() > 0.0) {
                Mono mono = originalRequestMono.doOnSuccess(response -> this.trackRequestCharge(response)).doOnError(throwable -> this.trackRequestCharge(throwable));
                return mono;
            }
            RequestRateTooLargeException requestRateTooLargeException = new RequestRateTooLargeException();
            int backoffTimeInMilliSeconds = (int)Math.floor(Math.abs(this.availableThroughput.get() * 1000.0 / this.scheduledThroughput.get()));
            requestRateTooLargeException.getResponseHeaders().put("x-ms-retry-after-ms", String.valueOf(backoffTimeInMilliSeconds));
            requestRateTooLargeException.getResponseHeaders().put("x-ms-substatus", String.valueOf(10003));
            if (request.requestContext != null) {
                BridgeInternal.setResourceAddress(requestRateTooLargeException, request.requestContext.resourcePhysicalAddress);
            }
            Mono mono = Mono.error((Throwable)((Object)requestRateTooLargeException));
            return mono;
        }
        finally {
            this.throughputReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void trackRequestCharge(T response) {
        try {
            CosmosException cosmosException;
            this.throughputReadLock.lock();
            double requestCharge = 0.0;
            if (response instanceof StoreResponse) {
                requestCharge = ((StoreResponse)response).getRequestCharge();
            } else if (response instanceof RxDocumentServiceResponse) {
                requestCharge = ((RxDocumentServiceResponse)response).getRequestCharge();
            } else if (response instanceof Throwable && (cosmosException = Utils.as(Exceptions.unwrap((Throwable)((Throwable)response)), CosmosException.class)) != null) {
                requestCharge = cosmosException.getRequestCharge();
            }
            this.availableThroughput.getAndAccumulate(requestCharge, (available, consumed) -> available - consumed);
        }
        finally {
            this.throughputReadLock.unlock();
        }
    }

    public double getAvailableThroughput() {
        return this.availableThroughput.get();
    }

    public double getScheduledThroughput() {
        return this.scheduledThroughput.get();
    }
}

