/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.ratelimit.client.flow;

import com.tencent.polaris.api.plugin.ratelimiter.RemoteQuotaInfo;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.ratelimit.client.flow.DurationBaseCallback;
import com.tencent.polaris.ratelimit.client.flow.HostIdentifier;
import com.tencent.polaris.ratelimit.client.flow.InitializeRecord;
import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow;
import com.tencent.polaris.ratelimit.client.flow.ServiceIdentifier;
import com.tencent.polaris.ratelimit.client.pb.RateLimitGRPCV2Grpc;
import com.tencent.polaris.ratelimit.client.pb.RatelimitV2;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

public class StreamResource
implements StreamObserver<RatelimitV2.RateLimitResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamResource.class);
    private final AtomicBoolean endStream = new AtomicBoolean(false);
    private final AtomicLong lastConnectFailTimeMilli = new AtomicLong(0L);
    private final HostIdentifier hostIdentifier;
    private final ManagedChannel channel;
    private final StreamObserver<RatelimitV2.RateLimitRequest> streamClient;
    private final RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub client;
    private final Map<ServiceIdentifier, InitializeRecord> initRecord = new ConcurrentHashMap<ServiceIdentifier, InitializeRecord>();
    private final Map<Integer, DurationBaseCallback> counters = new ConcurrentHashMap<Integer, DurationBaseCallback>();
    private final AtomicLong lastRecvTime = new AtomicLong(0L);
    private int clientKey;
    private final AtomicLong timeDiffMilli = new AtomicLong();
    private final AtomicLong lastSyncTimeMilli = new AtomicLong();
    private final AtomicLong syncInterval = new AtomicLong(30000L);

    public StreamResource(HostIdentifier identifier) {
        this.channel = this.createConnection(identifier);
        this.hostIdentifier = identifier;
        RateLimitGRPCV2Grpc.RateLimitGRPCV2Stub rateLimitGRPCV2Stub = RateLimitGRPCV2Grpc.newStub((Channel)this.channel);
        this.streamClient = rateLimitGRPCV2Stub.service((StreamObserver)this);
        this.client = RateLimitGRPCV2Grpc.newBlockingStub((Channel)this.channel);
    }

    private ManagedChannel createConnection(HostIdentifier identifier) {
        ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress((String)identifier.getHost(), (int)identifier.getPort()).usePlaintext();
        return builder.build();
    }

    public HostIdentifier getHostIdentifier() {
        return this.hostIdentifier;
    }

    public void closeStream(boolean closeSend) {
        if (this.endStream.compareAndSet(false, true)) {
            if (closeSend && null != this.streamClient) {
                LOG.info("[ServerConnector]connection {} start to closeSend", (Object)this.hostIdentifier);
                this.streamClient.onCompleted();
            }
            if (null != this.channel) {
                this.channel.shutdown();
            }
        }
    }

    public void onNext(RatelimitV2.RateLimitResponse rateLimitResponse) {
        LOG.debug("ratelimit response receive is {}", (Object)rateLimitResponse);
        this.lastRecvTime.set(System.currentTimeMillis());
        if (RatelimitV2.RateLimitCmd.INIT.equals((Object)rateLimitResponse.getCmd())) {
            this.handleRateLimitInitResponse(rateLimitResponse.getRateLimitInitResponse());
        } else if (RatelimitV2.RateLimitCmd.ACQUIRE.equals((Object)rateLimitResponse.getCmd())) {
            this.handleRateLimitReportResponse(rateLimitResponse.getRateLimitReportResponse());
        }
    }

    public void onError(Throwable throwable) {
        LOG.error("received error from server {}", (Object)this.hostIdentifier, (Object)throwable);
        this.lastConnectFailTimeMilli.set(System.currentTimeMillis());
        this.closeStream(false);
    }

    public void onCompleted() {
        LOG.error("received EOF from server {}", (Object)this.hostIdentifier);
        this.closeStream(true);
    }

    public InitializeRecord addInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) {
        if (!this.initRecord.containsKey(serviceIdentifier)) {
            LOG.info("[RateLimit] add init record for {}, stream is {}", (Object)serviceIdentifier, (Object)this.hostIdentifier);
            this.initRecord.putIfAbsent(serviceIdentifier, new InitializeRecord(rateLimitWindow));
        }
        return this.initRecord.get(serviceIdentifier);
    }

    public void deleteInitRecord(ServiceIdentifier serviceIdentifier) {
        LOG.info("[RateLimit] delete init record for {}, stream is {}", (Object)serviceIdentifier, (Object)this.hostIdentifier);
        this.initRecord.remove(serviceIdentifier);
    }

    private void handleRateLimitInitResponse(RatelimitV2.RateLimitInitResponse rateLimitInitResponse) {
        LOG.debug("[handleRateLimitInitResponse] response:{}", (Object)rateLimitInitResponse);
        if (rateLimitInitResponse.getCode() != 200000) {
            LOG.error("[handleRateLimitInitResponse] failed. code is {}", (Object)rateLimitInitResponse.getCode());
            return;
        }
        RatelimitV2.LimitTarget target = rateLimitInitResponse.getTarget();
        ServiceIdentifier serviceIdentifier = new ServiceIdentifier(target.getService(), target.getNamespace(), target.getLabels());
        InitializeRecord initializeRecord = this.initRecord.get(serviceIdentifier);
        if (initializeRecord == null) {
            LOG.error("[handleRateLimitInitResponse] can not find init record:{}", (Object)serviceIdentifier);
            return;
        }
        this.setClientKey(rateLimitInitResponse.getClientKey());
        List countersList = rateLimitInitResponse.getCountersList();
        if (CollectionUtils.isEmpty((Collection)countersList)) {
            LOG.error("[handleRateLimitInitResponse] countersList is empty.");
            return;
        }
        initializeRecord.getDurationRecord().clear();
        long remoteQuotaTimeMilli = rateLimitInitResponse.getTimestamp();
        long localQuotaTimeMilli = this.getLocalTimeMilli(remoteQuotaTimeMilli);
        RateLimitWindow rateLimitWindow = initializeRecord.getRateLimitWindow();
        countersList.forEach(counter -> {
            initializeRecord.getDurationRecord().putIfAbsent(counter.getDuration(), counter.getCounterKey());
            this.counters.putIfAbsent(counter.getCounterKey(), new DurationBaseCallback(counter.getDuration(), rateLimitWindow));
            RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(counter.getLeft(), counter.getClientCount(), localQuotaTimeMilli, (long)(counter.getDuration() * 1000));
            rateLimitWindow.getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo);
        });
        LOG.info("[RateLimit] window {} has turn to initialized", (Object)rateLimitWindow.getUniqueKey());
        rateLimitWindow.setStatus(RateLimitWindow.WindowStatus.INITIALIZED.ordinal());
    }

    void handleRateLimitReportResponse(RatelimitV2.RateLimitReportResponse rateLimitReportResponse) {
        LOG.debug("[handleRateLimitReportRequest] response:{}", (Object)rateLimitReportResponse);
        if (rateLimitReportResponse.getCode() != 200000) {
            LOG.error("[handleRateLimitReportRequest] failed. code is {}", (Object)rateLimitReportResponse.getCode());
            return;
        }
        List quotaLeftsList = rateLimitReportResponse.getQuotaLeftsList();
        if (CollectionUtils.isEmpty((Collection)quotaLeftsList)) {
            LOG.error("[handleRateLimitReportRequest] quotaLefts is empty.");
            return;
        }
        long remoteQuotaTimeMilli = rateLimitReportResponse.getTimestamp();
        long localQuotaTimeMilli = this.getLocalTimeMilli(remoteQuotaTimeMilli);
        quotaLeftsList.forEach(quotaLeft -> {
            DurationBaseCallback callback = this.counters.get(quotaLeft.getCounterKey());
            RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(quotaLeft.getLeft(), quotaLeft.getClientCount(), localQuotaTimeMilli, (long)(callback.getDuration() * 1000));
            callback.getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo);
        });
    }

    public int getClientKey() {
        return this.clientKey;
    }

    public void setClientKey(int clientKey) {
        this.clientKey = clientKey;
    }

    private long getLocalTimeMilli(long remoteTimeMilli) {
        return remoteTimeMilli - this.timeDiffMilli.get();
    }

    public long getRemoteTimeMilli(long localTimeMilli) {
        return localTimeMilli + this.timeDiffMilli.get();
    }

    public void adjustTime() {
        RatelimitV2.TimeAdjustResponse timeAdjustResponse;
        long lastSyncTime = this.lastSyncTimeMilli.get();
        long currentTimeMillis = System.currentTimeMillis();
        if (lastSyncTime > 0L && currentTimeMillis - lastSyncTime < this.syncInterval.get()) {
            LOG.debug("[RateLimit] adjustTime need wait.lastSyncTimeMilli:{},sendTimeMilli:{}", (Object)this.lastSyncTimeMilli, (Object)currentTimeMillis);
            return;
        }
        long localSendTimeMilli = System.currentTimeMillis();
        RatelimitV2.TimeAdjustRequest timeAdjustRequest = RatelimitV2.TimeAdjustRequest.newBuilder().build();
        try {
            timeAdjustResponse = this.client.timeAdjust(timeAdjustRequest);
        }
        catch (Throwable e) {
            LOG.error("[RateLimit] fail to adjust time, err {}", (Object)e.getMessage());
            this.onError(e);
            return;
        }
        long localReceiveTimeMilli = System.currentTimeMillis();
        this.lastSyncTimeMilli.set(localReceiveTimeMilli);
        long remoteSendTimeMilli = timeAdjustResponse.getServerTimestamp();
        long latency = localReceiveTimeMilli - localSendTimeMilli;
        long remoteReceiveTimeMilli = remoteSendTimeMilli + latency / 3L;
        long timeDiff = remoteReceiveTimeMilli - localReceiveTimeMilli;
        if (this.timeDiffMilli.get() == timeDiff && this.syncInterval.get() < 180000L) {
            this.syncInterval.set(this.syncInterval.get() + 30000L);
        }
        this.timeDiffMilli.set(timeDiff);
        LOG.info("[RateLimit] adjust time to server time is {}, latency is {},diff is {}", new Object[]{remoteSendTimeMilli, latency, timeDiff});
    }

    public boolean isEndStream() {
        return this.endStream.get();
    }

    public boolean sendRateLimitRequest(RatelimitV2.RateLimitRequest rateLimitRequest) {
        LOG.debug("ratelimit request to send is {}", (Object)rateLimitRequest);
        try {
            this.streamClient.onNext((Object)rateLimitRequest);
            return true;
        }
        catch (Throwable e) {
            LOG.error("[RateLimit] fail to send request, err {}", (Object)e.getMessage());
            this.onError(e);
            return false;
        }
    }

    public boolean hasInit(ServiceIdentifier serviceIdentifier) {
        return this.initRecord.containsKey(serviceIdentifier);
    }

    public InitializeRecord getInitRecord(ServiceIdentifier serviceIdentifier) {
        return this.initRecord.get(serviceIdentifier);
    }

    public Integer getCounterKey(ServiceIdentifier serviceIdentifier, Integer duration) {
        InitializeRecord initializeRecord = this.initRecord.get(serviceIdentifier);
        if (null == initializeRecord) {
            return null;
        }
        return initializeRecord.getDurationRecord().get(duration);
    }
}

