/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnavailableException;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.RateLimitInfo;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.util.Durations;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.TemporalAmount;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {
    private static final Logger logger = Logger.getLogger(RateLimitingServerStreamingCallable.class.getName());
    private static final long DEFAULT_QPS = 10L;
    private static final Duration DEFAULT_PERIOD = Duration.ofSeconds((long)10L);
    private static final double MIN_QPS = 0.1;
    private static final double MAX_QPS = 100000.0;
    @VisibleForTesting
    static final double MIN_FACTOR = 0.7;
    private static final double MAX_FACTOR = 1.3;
    private final RateLimiter limiter;
    private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<Instant>(Instant.now());
    private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

    RateLimitingServerStreamingCallable(@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
        this.limiter = RateLimiter.create((double)10.0);
        this.innerCallable = (ServerStreamingCallable)Preconditions.checkNotNull(innerCallable, (Object)"Inner callable must be set");
        logger.info("Rate limiting is enabled with initial QPS of " + this.limiter.getRate());
    }

    public void call(MutateRowsRequest request, ResponseObserver<MutateRowsResponse> responseObserver, ApiCallContext context) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.limiter.acquire();
        stopwatch.stop();
        if (context.getTracer() instanceof BigtableTracer) {
            ((BigtableTracer)context.getTracer()).batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
        }
        RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(this.limiter, this.lastQpsChangeTime, responseObserver);
        this.innerCallable.call((Object)request, (ResponseObserver)innerObserver, context);
    }

    @VisibleForTesting
    AtomicReference<Instant> getLastQpsChangeTime() {
        return this.lastQpsChangeTime;
    }

    @VisibleForTesting
    double getCurrentRate() {
        return this.limiter.getRate();
    }

    class RateLimitingResponseObserver
    extends SafeResponseObserver<MutateRowsResponse> {
        private final ResponseObserver<MutateRowsResponse> outerObserver;
        private final RateLimiter rateLimiter;
        private final AtomicReference<Instant> lastQpsChangeTime;

        RateLimitingResponseObserver(RateLimiter rateLimiter, AtomicReference<Instant> lastQpsChangeTime, ResponseObserver<MutateRowsResponse> observer) {
            super(observer);
            this.outerObserver = observer;
            this.rateLimiter = rateLimiter;
            this.lastQpsChangeTime = lastQpsChangeTime;
        }

        @Override
        protected void onStartImpl(StreamController controller) {
            this.outerObserver.onStart(controller);
        }

        @Override
        protected void onResponseImpl(MutateRowsResponse response) {
            RateLimitInfo info;
            if (response.hasRateLimitInfo() && (info = response.getRateLimitInfo()).getFactor() != 0.0 && info.getPeriod().getSeconds() != 0L) {
                this.updateQps(info.getFactor(), Duration.ofSeconds((long)Durations.toSeconds((com.google.protobuf.Duration)info.getPeriod())));
            }
            this.outerObserver.onResponse((Object)response);
        }

        @Override
        protected void onErrorImpl(Throwable t) {
            if (t instanceof DeadlineExceededException || t instanceof UnavailableException || t instanceof ResourceExhaustedException) {
                this.updateQps(0.7, DEFAULT_PERIOD);
            }
            this.outerObserver.onError(t);
        }

        @Override
        protected void onCompleteImpl() {
            this.outerObserver.onComplete();
        }

        private void updateQps(double factor, Duration period) {
            Instant lastTime = this.lastQpsChangeTime.get();
            Instant now = Instant.now();
            if (now.minus((TemporalAmount)period).isAfter(lastTime) && this.lastQpsChangeTime.compareAndSet(lastTime, now)) {
                double cappedFactor = Math.min(Math.max(factor, 0.7), 1.3);
                double currentRate = RateLimitingServerStreamingCallable.this.limiter.getRate();
                RateLimitingServerStreamingCallable.this.limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, 0.1), 100000.0));
                logger.log(Level.FINE, "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}", new Object[]{currentRate, RateLimitingServerStreamingCallable.this.limiter.getRate(), factor, cappedFactor});
            }
        }
    }
}

