/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.ratelimit.client.flow;

import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.plugin.ratelimiter.RemoteQuotaInfo;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.ratelimit.client.flow.AsyncRateLimitConnector;
import com.tencent.polaris.ratelimit.client.flow.DurationBaseCallback;
import com.tencent.polaris.ratelimit.client.flow.HostIdentifier;
import com.tencent.polaris.ratelimit.client.flow.InitializeRecord;
import com.tencent.polaris.ratelimit.client.flow.RateLimitWindow;
import com.tencent.polaris.ratelimit.client.flow.ServiceIdentifier;
import com.tencent.polaris.ratelimit.client.pb.RateLimitGRPCV2Grpc;
import com.tencent.polaris.ratelimit.client.pb.RatelimitV2;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCounterSet {
    private static final Logger LOG = LoggerFactory.getLogger(StreamCounterSet.class);
    private final AtomicInteger reference = new AtomicInteger();
    private final AsyncRateLimitConnector asyncConnector;
    private final HostIdentifier identifier;
    private final AtomicReference<StreamResource> currentStreamResource = new AtomicReference();
    private final long reconnectInterval;
    private int clientKey;
    private final Map<ServiceIdentifier, InitializeRecord> initRecord = new HashMap<ServiceIdentifier, InitializeRecord>();
    private final Map<Integer, DurationBaseCallback> counters = new HashMap<Integer, DurationBaseCallback>();
    private final AtomicLong lastRecvTime = new AtomicLong(0L);

    public StreamCounterSet(AsyncRateLimitConnector asyncConnector, HostIdentifier identifier, Configuration configuration) {
        this.asyncConnector = asyncConnector;
        this.identifier = identifier;
        this.reconnectInterval = configuration.getGlobal().getServerConnector().getReconnectInterval();
    }

    public HostIdentifier getIdentifier() {
        return this.identifier;
    }

    public StreamObserver<RatelimitV2.RateLimitRequest> preCheckAsync(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) {
        StreamResource streamResource = this.checkAndCreateResource(serviceIdentifier, rateLimitWindow);
        if (null != streamResource) {
            return streamResource.streamClient;
        }
        return null;
    }

    public RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub preCheckSync(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) {
        StreamResource streamResource = this.checkAndCreateResource(serviceIdentifier, rateLimitWindow);
        if (null != streamResource) {
            return streamResource.client;
        }
        return null;
    }

    private StreamResource checkAndCreateResource(ServiceIdentifier serviceIdentifier, RateLimitWindow rateLimitWindow) {
        ManagedChannel channel;
        StreamResource streamResource = this.currentStreamResource.get();
        if (null != streamResource && !streamResource.endStream.get()) {
            return streamResource;
        }
        long lastConnectFailTimeMilli = 0L;
        if (null != streamResource) {
            lastConnectFailTimeMilli = streamResource.lastConnectFailTimeMilli.get();
        }
        if (null == (channel = this.createConnection(lastConnectFailTimeMilli))) {
            return null;
        }
        RateLimitGRPCV2Grpc.RateLimitGRPCV2Stub rateLimitGRPCV2Stub = RateLimitGRPCV2Grpc.newStub((Channel)channel);
        StreamObserver streamClient = rateLimitGRPCV2Stub.service((StreamObserver)streamResource);
        RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub rateLimitGRPCV2BlockingStub = RateLimitGRPCV2Grpc.newBlockingStub((Channel)channel);
        streamResource = new StreamResource(channel, (StreamObserver<RatelimitV2.RateLimitRequest>)streamClient, rateLimitGRPCV2BlockingStub);
        this.currentStreamResource.set(streamResource);
        if (this.initRecord.get(serviceIdentifier) == null) {
            this.initRecord.putIfAbsent(serviceIdentifier, new InitializeRecord(rateLimitWindow));
        }
        return streamResource;
    }

    private ManagedChannel createConnection(long lastConnectFailTimeMilli) {
        long curTimeMilli = System.currentTimeMillis();
        long timePassed = curTimeMilli - lastConnectFailTimeMilli;
        if (lastConnectFailTimeMilli > 0L && timePassed > 0L && timePassed < this.reconnectInterval) {
            LOG.debug("reconnect interval should exceed {}", (Object)this.reconnectInterval);
            return null;
        }
        ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress((String)this.identifier.getHost(), (int)this.identifier.getPort()).usePlaintext();
        return builder.build();
    }

    public boolean hasInit(ServiceIdentifier serviceIdentifier) {
        if (!this.initRecord.containsKey(serviceIdentifier)) {
            return false;
        }
        return this.initRecord.get(serviceIdentifier).getDurationRecord().size() > 0;
    }

    void handleRateLimitInitResponse(RatelimitV2.RateLimitInitResponse rateLimitInitResponse) {
        LOG.debug("[handleRateLimitInitResponse] response:{}", (Object)rateLimitInitResponse);
        if (rateLimitInitResponse.getCode() != 200) {
            LOG.error("[handleRateLimitInitResponse] failed. code is {}", (Object)rateLimitInitResponse.getCode());
            return;
        }
        RatelimitV2.LimitTarget target = rateLimitInitResponse.getTarget();
        ServiceIdentifier serviceIdentifier = new ServiceIdentifier(target.getService(), target.getNamespace(), target.getLabels());
        InitializeRecord initializeRecord = this.initRecord.get(serviceIdentifier);
        if (initializeRecord == null) {
            LOG.error("[handleRateLimitInitResponse] can not find init record:{}", (Object)serviceIdentifier);
            return;
        }
        this.setClientKey(rateLimitInitResponse.getClientKey());
        List countersList = rateLimitInitResponse.getCountersList();
        if (CollectionUtils.isEmpty((Collection)countersList)) {
            LOG.error("[handleRateLimitInitResponse] countersList is empty.");
            return;
        }
        initializeRecord.getDurationRecord().clear();
        long serverTimeMilli = rateLimitInitResponse.getTimestamp() + this.asyncConnector.getTimeDiff().get();
        countersList.forEach(counter -> {
            initializeRecord.getDurationRecord().putIfAbsent(counter.getDuration(), counter.getCounterKey());
            this.getCounters().putIfAbsent(counter.getCounterKey(), new DurationBaseCallback(counter.getDuration(), initializeRecord.getRateLimitWindow()));
            RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(counter.getLeft(), counter.getClientCount(), serverTimeMilli, (long)(counter.getDuration() * 1000));
            initializeRecord.getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo);
        });
        initializeRecord.getRateLimitWindow().setStatus(RateLimitWindow.WindowStatus.INITIALIZED.ordinal());
    }

    void handleRateLimitReportResponse(RatelimitV2.RateLimitReportResponse rateLimitReportResponse) {
        LOG.debug("[handleRateLimitReportRequest] response:{}", (Object)rateLimitReportResponse);
        if (rateLimitReportResponse.getCode() != 200) {
            LOG.error("[handleRateLimitReportRequest] failed. code is {}", (Object)rateLimitReportResponse.getCode());
            return;
        }
        long serverTimeMilli = rateLimitReportResponse.getTimestamp();
        List quotaLeftsList = rateLimitReportResponse.getQuotaLeftsList();
        if (CollectionUtils.isEmpty((Collection)quotaLeftsList)) {
            LOG.error("[handleRateLimitReportRequest] quotaLefts is empty.");
            return;
        }
        quotaLeftsList.forEach(quotaLeft -> {
            DurationBaseCallback callback = this.getCounters().get(quotaLeft.getCounterKey());
            RemoteQuotaInfo remoteQuotaInfo = new RemoteQuotaInfo(quotaLeft.getLeft(), quotaLeft.getClientCount(), serverTimeMilli, (long)(callback.getDuration() * 1000));
            callback.getRateLimitWindow().getAllocatingBucket().onRemoteUpdate(remoteQuotaInfo);
        });
    }

    public int getClientKey() {
        return this.clientKey;
    }

    public void setClientKey(int clientKey) {
        this.clientKey = clientKey;
    }

    public Map<Integer, DurationBaseCallback> getCounters() {
        return this.counters;
    }

    public Map<ServiceIdentifier, InitializeRecord> getInitRecord() {
        return this.initRecord;
    }

    public void addReference() {
        this.reference.incrementAndGet();
    }

    public boolean decreaseReference() {
        int value = this.reference.decrementAndGet();
        if (value == 0) {
            StreamResource streamResource = this.currentStreamResource.get();
            if (null != streamResource) {
                streamResource.closeStream(true);
            }
            return true;
        }
        return false;
    }

    private class StreamResource
    implements StreamObserver<RatelimitV2.RateLimitResponse> {
        private final AtomicBoolean endStream = new AtomicBoolean(false);
        private final AtomicLong lastConnectFailTimeMilli = new AtomicLong(0L);
        final ManagedChannel channel;
        final StreamObserver<RatelimitV2.RateLimitRequest> streamClient;
        final RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub client;

        public StreamResource(ManagedChannel channel, StreamObserver<RatelimitV2.RateLimitRequest> streamClient, RateLimitGRPCV2Grpc.RateLimitGRPCV2BlockingStub client) {
            this.channel = channel;
            this.streamClient = streamClient;
            this.client = client;
        }

        public void closeStream(boolean closeSend) {
            if (this.endStream.compareAndSet(false, true)) {
                if (closeSend && null != this.streamClient) {
                    LOG.info("[ServerConnector]connection {} start to closeSend", (Object)StreamCounterSet.this.identifier);
                    this.streamClient.onCompleted();
                }
                if (null != this.channel) {
                    this.channel.shutdown();
                }
            }
        }

        public void onNext(RatelimitV2.RateLimitResponse rateLimitResponse) {
            StreamCounterSet.this.lastRecvTime.set(System.currentTimeMillis());
            if (RatelimitV2.RateLimitCmd.INIT.equals((Object)rateLimitResponse.getCmd())) {
                StreamCounterSet.this.handleRateLimitInitResponse(rateLimitResponse.getRateLimitInitResponse());
            } else if (RatelimitV2.RateLimitCmd.ACQUIRE.equals((Object)rateLimitResponse.getCmd())) {
                StreamCounterSet.this.handleRateLimitReportResponse(rateLimitResponse.getRateLimitReportResponse());
            }
        }

        public void onError(Throwable throwable) {
            LOG.error("received error from server {}", (Object)StreamCounterSet.this.identifier, (Object)throwable);
            this.lastConnectFailTimeMilli.set(System.currentTimeMillis());
            this.closeStream(false);
        }

        public void onCompleted() {
            LOG.error("received EOF from server {}", (Object)StreamCounterSet.this.identifier);
            this.closeStream(true);
        }
    }
}

