/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.ratelimit.client.sync;

import com.tencent.polaris.api.plugin.ratelimiter.AmountInfo;
import com.tencent.polaris.api.plugin.ratelimiter.LocalQuotaInfo;
import com.tencent.polaris.api.plugin.ratelimiter.QuotaBucket;
import com.tencent.polaris.api.utils.MapUtils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.ratelimit.client.flow.AsyncRateLimitConnector;
import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow;
import com.tencent.polaris.ratelimit.client.flow.ServiceIdentifier;
import com.tencent.polaris.ratelimit.client.flow.StreamCounterSet;
import com.tencent.polaris.ratelimit.client.pb.RateLimitGRPCV2Grpc;
import com.tencent.polaris.ratelimit.client.pb.RatelimitV2;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import org.slf4j.Logger;

public class RemoteSyncTask
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteSyncTask.class);
    private final RateLimitWindow window;
    private final AsyncRateLimitConnector asyncRateLimitConnector;
    private final ServiceIdentifier serviceIdentifier;

    public RemoteSyncTask(RateLimitWindow window) {
        this.window = window;
        this.asyncRateLimitConnector = window.getWindowSet().getAsyncRateLimitConnector();
        this.serviceIdentifier = new ServiceIdentifier(window.getSvcKey().getService(), window.getSvcKey().getNamespace(), window.getLabels());
    }

    public RateLimitWindow getWindow() {
        return this.window;
    }

    @Override
    public void run() {
        switch (this.window.getStatus()) {
            case CREATED: 
            case DELETED: {
                break;
            }
            case INITIALIZING: {
                this.doRemoteInit();
                break;
            }
            default: {
                this.doRemoteAcquire();
            }
        }
    }

    private void doRemoteInit() {
        StreamCounterSet streamCounterSet = this.asyncRateLimitConnector.getStreamCounterSet(this.window.getWindowSet().getRateLimitExtension().getExtensions(), this.window.getRemoteCluster(), this.window.getUniqueKey(), this.serviceIdentifier);
        if (streamCounterSet == null) {
            LOG.error("[doRemoteInit] failed, stream counter is null. remote cluster:{},", (Object)this.window.getRemoteCluster());
            return;
        }
        if (!this.adjustTime(streamCounterSet)) {
            LOG.error("[doRemoteInit] adjustTime failed.remote cluster:{},svcKey:{}", (Object)this.window.getRemoteCluster(), (Object)this.window.getSvcKey());
            return;
        }
        StreamObserver<RatelimitV2.RateLimitRequest> streamClient = streamCounterSet.preCheckAsync(this.serviceIdentifier, this.window);
        if (streamClient == null) {
            LOG.error("[doRemoteInit] failed, stream client is null. remote cluster:{},svcKey:{}", (Object)this.window.getRemoteCluster(), (Object)this.window.getSvcKey());
            return;
        }
        RatelimitV2.RateLimitInitRequest.Builder initRequest = RatelimitV2.RateLimitInitRequest.newBuilder();
        initRequest.setClientId(this.window.getWindowSet().getClientId());
        RatelimitV2.LimitTarget.Builder target = RatelimitV2.LimitTarget.newBuilder();
        target.setNamespace(this.window.getSvcKey().getNamespace());
        target.setService(this.window.getSvcKey().getService());
        target.setLabels(this.window.getLabels());
        initRequest.setTarget(target);
        RatelimitV2.QuotaMode quotaMode = RatelimitV2.QuotaMode.forNumber((int)this.window.getRule().getAmountModeValue());
        QuotaBucket allocatingBucket = this.window.getAllocatingBucket();
        Map amountInfos = allocatingBucket.getAmountInfo();
        if (MapUtils.isNotEmpty((Map)amountInfos)) {
            for (Map.Entry entry : amountInfos.entrySet()) {
                RatelimitV2.QuotaTotal.Builder total = RatelimitV2.QuotaTotal.newBuilder();
                total.setDuration(((Integer)entry.getKey()).intValue());
                total.setMode(quotaMode);
                total.setMaxAmount((int)((AmountInfo)entry.getValue()).getMaxAmount());
                initRequest.addTotals(total.build());
            }
        }
        RatelimitV2.RateLimitRequest rateLimitInitRequest = RatelimitV2.RateLimitRequest.newBuilder().setCmd(RatelimitV2.RateLimitCmd.INIT).setRateLimitInitRequest(initRequest).build();
        streamClient.onNext((Object)rateLimitInitRequest);
    }

    private void doRemoteAcquire() {
        StreamCounterSet streamCounterSet = this.asyncRateLimitConnector.getStreamCounterSet(this.window.getWindowSet().getRateLimitExtension().getExtensions(), this.window.getRemoteCluster(), this.window.getUniqueKey(), this.serviceIdentifier);
        if (streamCounterSet == null) {
            LOG.error("[doRemoteAcquire] failed, stream counter is null. remote cluster:{},", (Object)this.window.getRemoteCluster());
            return;
        }
        if (!streamCounterSet.hasInit(this.serviceIdentifier)) {
            LOG.warn("[doRemoteAcquire] has not inited. serviceKey:{}", (Object)this.window.getSvcKey());
            this.doRemoteInit();
            return;
        }
        if (!this.adjustTime(streamCounterSet)) {
            LOG.error("[doRemoteAcquire] adjustTime failed.remote cluster:{},svcKey:{}", (Object)this.window.getRemoteCluster(), (Object)this.window.getSvcKey());
            return;
        }
        StreamObserver<RatelimitV2.RateLimitRequest> streamClient = streamCounterSet.preCheckAsync(this.serviceIdentifier, this.window);
        if (streamClient == null) {
            LOG.error("[doRemoteAcquire] failed, stream client is null. remote cluster:{}", (Object)this.window.getRemoteCluster());
            return;
        }
        RatelimitV2.RateLimitReportRequest.Builder rateLimitReportRequest = RatelimitV2.RateLimitReportRequest.newBuilder();
        rateLimitReportRequest.setClientKey(streamCounterSet.getClientKey());
        long curTimeMilli = System.currentTimeMillis();
        long serverTimeMilli = curTimeMilli + this.asyncRateLimitConnector.getTimeDiff().get();
        rateLimitReportRequest.setTimestamp(serverTimeMilli);
        Map localQuotaInfos = this.window.getAllocatingBucket().fetchLocalUsage(serverTimeMilli);
        for (Map.Entry entry : localQuotaInfos.entrySet()) {
            RatelimitV2.QuotaSum.Builder quotaSum = RatelimitV2.QuotaSum.newBuilder();
            quotaSum.setUsed((int)((LocalQuotaInfo)entry.getValue()).getQuotaUsed());
            quotaSum.setLimited((int)((LocalQuotaInfo)entry.getValue()).getQuotaLimited());
            quotaSum.setCounterKey(streamCounterSet.getInitRecord().get(this.serviceIdentifier).getDurationRecord().get(entry.getKey()).intValue());
            rateLimitReportRequest.addQuotaUses(quotaSum.build());
        }
        RatelimitV2.RateLimitRequest rateLimitRequest = RatelimitV2.RateLimitRequest.newBuilder().setCmd(RatelimitV2.RateLimitCmd.ACQUIRE).setRateLimitReportRequest(rateLimitReportRequest).build();
        streamClient.onNext((Object)rateLimitRequest);
    }

    private boolean adjustTime(StreamCounterSet streamCounterSet) {
        long lastSyncTimeMilli = this.asyncRateLimitConnector.getLastSyncTimeMilli().get();
        long sendTimeMilli = System.currentTimeMillis();
        if (lastSyncTimeMilli > 0L && sendTimeMilli - lastSyncTimeMilli < 30000L) {
            LOG.info("adjustTime need wait.lastSyncTimeMilli:{},sendTimeMilli:{}", (Object)lastSyncTimeMilli, (Object)sendTimeMilli);
            return true;
        }
        RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub client = streamCounterSet.preCheckSync(this.serviceIdentifier, this.window);
        if (client == null) {
            LOG.error("[adjustTime] can not get connection {}", (Object)this.window.getRemoteCluster());
            return false;
        }
        RatelimitV2.TimeAdjustRequest timeAdjustRequest = RatelimitV2.TimeAdjustRequest.newBuilder().build();
        RatelimitV2.TimeAdjustResponse timeAdjustResponse = client.timeAdjust(timeAdjustRequest);
        long receiveClientTimeMilli = System.currentTimeMillis();
        this.asyncRateLimitConnector.getLastSyncTimeMilli().set(receiveClientTimeMilli);
        long serverTimestamp = timeAdjustResponse.getServerTimestamp();
        long latency = receiveClientTimeMilli - sendTimeMilli;
        long timeDiff = serverTimestamp + latency / 2L - receiveClientTimeMilli;
        this.asyncRateLimitConnector.getTimeDiff().set(timeDiff);
        LOG.info("[RateLimit]adjust time to server time is {}, latency is {},diff is {}", new Object[]{serverTimestamp, latency, timeDiff});
        return true;
    }
}

