/*
 * Decompiled with CFR 0.152.
 */
package com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.stub;

import com.google.bigtable.repackaged.com.google.api.gax.rpc.ApiCallContext;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.DeadlineExceededException;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.ResponseObserver;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.StreamController;
import com.google.bigtable.repackaged.com.google.api.gax.rpc.UnavailableException;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.repackaged.com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.repackaged.com.google.bigtable.v2.RateLimitInfo;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.bigtable.repackaged.com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.bigtable.repackaged.com.google.common.base.Preconditions;
import com.google.bigtable.repackaged.com.google.common.base.Stopwatch;
import com.google.bigtable.repackaged.com.google.common.util.concurrent.RateLimiter;
import com.google.bigtable.repackaged.com.google.protobuf.util.Durations;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

class RateLimitingServerStreamingCallable
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {
    private static final Logger logger = Logger.getLogger(RateLimitingServerStreamingCallable.class.getName());
    private static final long DEFAULT_QPS = 10L;
    private static final Duration DEFAULT_PERIOD = Duration.ofSeconds(10L);
    private static final double MIN_QPS = 0.1;
    private static final double MAX_QPS = 100000.0;
    @VisibleForTesting
    static final double MIN_FACTOR = 0.7;
    private static final double MAX_FACTOR = 1.3;
    private final ConditionalRateLimiter limiter = new ConditionalRateLimiter(10L);
    private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;

    RateLimitingServerStreamingCallable(@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
        this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
    }

    @Override
    public void call(MutateRowsRequest request, ResponseObserver<MutateRowsResponse> responseObserver, ApiCallContext context) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.limiter.acquire();
        stopwatch.stop();
        if (context.getTracer() instanceof BigtableTracer) {
            ((BigtableTracer)context.getTracer()).batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
        }
        RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
        this.innerCallable.call(request, innerObserver, context);
    }

    @VisibleForTesting
    AtomicReference<Instant> getNextRateUpdateTime() {
        return this.limiter.nextRateUpdateTime;
    }

    @VisibleForTesting
    double getCurrentRate() {
        return this.limiter.getRate();
    }

    @VisibleForTesting
    void setRate(double rate) {
        this.limiter.setRate(rate);
    }

    @VisibleForTesting
    boolean getLimiterEnabled() {
        return this.limiter.isEnabled();
    }

    @VisibleForTesting
    void setLimiterEnabled(boolean enabled) {
        this.limiter.setEnabled(enabled);
    }

    static class ConditionalRateLimiter {
        private final AtomicBoolean enabled = new AtomicBoolean(false);
        private final RateLimiter limiter;
        private final AtomicReference<Instant> nextRateUpdateTime = new AtomicReference<Instant>(Instant.now());

        public ConditionalRateLimiter(long defaultQps) {
            this.limiter = RateLimiter.create(defaultQps);
            logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS.");
        }

        public void acquire() {
            if (this.enabled.get()) {
                this.limiter.acquire();
            }
        }

        public void tryDisable() {
            boolean wasEnabled;
            Instant nextTime = this.nextRateUpdateTime.get();
            Instant now = Instant.now();
            if (now.isAfter(nextTime) && (wasEnabled = this.enabled.getAndSet(false))) {
                logger.info("Rate limiter is disabled.");
            }
        }

        public void enable() {
            boolean wasEnabled = this.enabled.getAndSet(true);
            if (!wasEnabled) {
                logger.info("Rate limiter is enabled.");
            }
        }

        public boolean isEnabled() {
            return this.enabled.get();
        }

        public double getRate() {
            return this.limiter.getRate();
        }

        public void trySetRate(double rate, Duration period) {
            Instant nextTime = this.nextRateUpdateTime.get();
            Instant now = Instant.now();
            if (now.isBefore(nextTime)) {
                return;
            }
            Instant newNextTime = now.plusSeconds(period.getSeconds());
            if (!this.nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
                return;
            }
            double oldRate = this.limiter.getRate();
            this.limiter.setRate(rate);
            logger.info("Updated max rate from " + oldRate + " to " + rate + " with period " + period.getSeconds() + " seconds.");
        }

        @VisibleForTesting
        void setEnabled(boolean enabled) {
            this.enabled.set(enabled);
        }

        @VisibleForTesting
        void setRate(double rate) {
            this.limiter.setRate(rate);
        }
    }

    class RateLimitingResponseObserver
    extends SafeResponseObserver<MutateRowsResponse> {
        private final ResponseObserver<MutateRowsResponse> outerObserver;

        RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
            super(observer);
            this.outerObserver = observer;
        }

        @Override
        protected void onStartImpl(StreamController controller) {
            this.outerObserver.onStart(controller);
        }

        private boolean hasValidRateLimitInfo(MutateRowsResponse response) {
            if (!response.hasRateLimitInfo()) {
                logger.finest("Response carries no RateLimitInfo");
                return false;
            }
            if (response.getRateLimitInfo().getFactor() <= 0.0 || response.getRateLimitInfo().getPeriod().getSeconds() <= 0L) {
                logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo());
                return false;
            }
            logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo());
            return true;
        }

        @Override
        protected void onResponseImpl(MutateRowsResponse response) {
            if (this.hasValidRateLimitInfo(response)) {
                RateLimitingServerStreamingCallable.this.limiter.enable();
                RateLimitInfo info = response.getRateLimitInfo();
                this.updateQps(info.getFactor(), Duration.ofSeconds(Durations.toSeconds(info.getPeriod())));
            } else {
                RateLimitingServerStreamingCallable.this.limiter.tryDisable();
            }
            this.outerObserver.onResponse(response);
        }

        @Override
        protected void onErrorImpl(Throwable t) {
            if (t instanceof DeadlineExceededException || t instanceof UnavailableException || t instanceof ResourceExhaustedException) {
                this.updateQps(0.7, DEFAULT_PERIOD);
            }
            this.outerObserver.onError(t);
        }

        @Override
        protected void onCompleteImpl() {
            this.outerObserver.onComplete();
        }

        private void updateQps(double factor, Duration period) {
            double cappedFactor = Math.min(Math.max(factor, 0.7), 1.3);
            double currentRate = RateLimitingServerStreamingCallable.this.limiter.getRate();
            double cappedRate = Math.min(Math.max(currentRate * cappedFactor, 0.1), 100000.0);
            RateLimitingServerStreamingCallable.this.limiter.trySetRate(cappedRate, period);
        }
    }
}

