/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.plugins.connector.grpc;

import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.StringValue;
import com.google.protobuf.TextFormat;
import com.tencent.polaris.api.exception.ErrorCode;
import com.tencent.polaris.api.exception.ServerCodes;
import com.tencent.polaris.api.exception.ServerErrorResponseException;
import com.tencent.polaris.api.plugin.server.ServerEvent;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pb.PolarisGRPCGrpc;
import com.tencent.polaris.client.pb.RequestProto;
import com.tencent.polaris.client.pb.ResponseProto;
import com.tencent.polaris.client.pb.ServiceProto;
import com.tencent.polaris.plugins.connector.grpc.Connection;
import com.tencent.polaris.plugins.connector.grpc.GrpcUtil;
import com.tencent.polaris.plugins.connector.grpc.ServiceUpdateTask;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpecStreamClient
implements StreamObserver<ResponseProto.DiscoverResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(SpecStreamClient.class);
    private final Object clientLock = new Object();
    private final Map<ServiceEventKey, ServiceUpdateTask> pendingTask = new HashMap<ServiceEventKey, ServiceUpdateTask>();
    private final AtomicBoolean endStream = new AtomicBoolean(false);
    private final StreamObserver<RequestProto.DiscoverRequest> discoverClient;
    private final Connection connection;
    private final String reqId;
    private final AtomicLong lastRecvTimeMs = new AtomicLong(0L);
    private final long createTimeMs;
    private final long connectionIdleTimeoutMs;

    public SpecStreamClient(Connection connection, long connectionIdleTimeoutMs, ServiceUpdateTask serviceUpdateTask) {
        this.connection = connection;
        this.connectionIdleTimeoutMs = connectionIdleTimeoutMs;
        this.createTimeMs = System.currentTimeMillis();
        this.reqId = GrpcUtil.nextGetInstanceReqId();
        PolarisGRPCGrpc.PolarisGRPCStub namingStub = PolarisGRPCGrpc.newStub((Channel)connection.getChannel());
        GrpcUtil.attachRequestHeader(namingStub, GrpcUtil.nextGetInstanceReqId());
        this.discoverClient = namingStub.discover((StreamObserver)this);
        this.pendingTask.put(serviceUpdateTask.getServiceEventKey(), serviceUpdateTask);
    }

    public void closeStream(boolean closeSend) {
        boolean endStreamOK = this.endStream.compareAndSet(false, true);
        if (!endStreamOK) {
            return;
        }
        if (closeSend) {
            LOG.info("[ServerConnector]connection {} start to closeSend", (Object)this.connection.getConnID());
            this.discoverClient.onCompleted();
        }
        this.connection.release("Discover");
    }

    private boolean isEndStream() {
        return this.endStream.get();
    }

    public String getReqId() {
        return this.reqId;
    }

    public void sendRequest(ServiceUpdateTask serviceUpdateTask) {
        ServiceEventKey serviceEventKey = serviceUpdateTask.getServiceEventKey();
        ServiceProto.Service.Builder builder = ServiceProto.Service.newBuilder();
        builder.setName(StringValue.newBuilder().setValue(serviceEventKey.getServiceKey().getService()).build());
        builder.setNamespace(StringValue.newBuilder().setValue(serviceEventKey.getServiceKey().getNamespace()).build());
        RequestProto.DiscoverRequest.Builder req = RequestProto.DiscoverRequest.newBuilder();
        req.setType(GrpcUtil.buildDiscoverRequestType(serviceEventKey.getEventType()));
        req.setService(builder);
        if (serviceUpdateTask.getTaskType() == ServiceUpdateTask.Type.FIRST) {
            LOG.info("[ServerConnector]send request(id={}) to {} for service {}", new Object[]{this.reqId, this.connection.getConnID(), serviceEventKey});
        } else {
            LOG.debug("[ServerConnector]send request(id={}) to {} for service {}", new Object[]{this.reqId, this.connection.getConnID(), serviceEventKey});
        }
        this.discoverClient.onNext((Object)req.build());
    }

    private ValidResult validMessage(ResponseProto.DiscoverResponse response) {
        ServiceProto.Service service;
        ErrorCode errorCode = ErrorCode.Success;
        if (response.hasCode()) {
            errorCode = ServerCodes.convertServerErrorToRpcError((int)response.getCode().getValue());
        }
        if (null == (service = CollectionUtils.isNotEmpty((Collection)response.getServicesList()) ? (ServiceProto.Service)response.getServicesList().get(0) : response.getService()) || StringUtils.isEmpty((String)service.getNamespace().getValue()) || StringUtils.isEmpty((String)service.getName().getValue())) {
            return new ValidResult(null, ErrorCode.INVALID_SERVER_RESPONSE, "service is empty, response text is " + response.toString());
        }
        ServiceEventKey.EventType eventType = GrpcUtil.buildEventType(response.getType());
        if (eventType == ServiceEventKey.EventType.UNKNOWN) {
            return new ValidResult(null, ErrorCode.INVALID_SERVER_RESPONSE, "invalid event type " + response.getType());
        }
        ServiceEventKey serviceEventKey = new ServiceEventKey(new ServiceKey(service.getNamespace().getValue(), service.getName().getValue()), eventType);
        if (errorCode == ErrorCode.SERVER_ERROR) {
            return new ValidResult(serviceEventKey, errorCode, "invalid event type " + response.getType());
        }
        return new ValidResult(serviceEventKey, ErrorCode.Success, "");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionCallback(ValidResult validResult) {
        this.closeStream(false);
        LOG.error("[ServerConnector]exceptionCallback: errCode {}, info {}, serviceEventKey {}", new Object[]{validResult.getErrorCode(), validResult.getMessage(), validResult.getServiceEventKey()});
        this.connection.reportFail();
        ArrayList<ServiceUpdateTask> notifyTasks = new ArrayList<ServiceUpdateTask>();
        Iterator iterator = this.clientLock;
        synchronized (iterator) {
            ServiceEventKey serviceEventKey = validResult.getServiceEventKey();
            if (null == serviceEventKey) {
                if (CollectionUtils.isNotEmpty(this.pendingTask.values())) {
                    notifyTasks.addAll(this.pendingTask.values());
                    this.pendingTask.clear();
                }
            } else {
                ServiceUpdateTask task = this.pendingTask.remove(serviceEventKey);
                if (null != task) {
                    notifyTasks.add(task);
                }
            }
        }
        for (ServiceUpdateTask notifyTask : notifyTasks) {
            notifyTask.retry();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(ResponseProto.DiscoverResponse response) {
        ServerErrorResponseException error;
        ServiceUpdateTask updateTask;
        this.lastRecvTimeMs.set(System.currentTimeMillis());
        ValidResult validResult = this.validMessage(response);
        if (validResult.errorCode != ErrorCode.Success) {
            this.exceptionCallback(validResult);
            return;
        }
        ServiceProto.Service service = response.getService();
        ServiceKey serviceKey = new ServiceKey(service.getNamespace().getValue(), service.getName().getValue());
        ServiceEventKey.EventType eventType = GrpcUtil.buildEventType(response.getType());
        ServiceEventKey serviceEventKey = new ServiceEventKey(serviceKey, eventType);
        Object object = this.clientLock;
        synchronized (object) {
            updateTask = this.pendingTask.remove(serviceEventKey);
        }
        if (null == updateTask) {
            LOG.error("[ServerConnector]callback not found for:{}", (Object)TextFormat.shortDebugString((MessageOrBuilder)service));
            return;
        }
        if (updateTask.getTaskType() == ServiceUpdateTask.Type.FIRST) {
            LOG.info("[ServerConnector]receive response for {}", (Object)serviceEventKey);
        } else {
            LOG.debug("[ServerConnector]receive response for {}", (Object)serviceEventKey);
        }
        if (!response.hasCode() || response.getCode().getValue() == 200000) {
            error = null;
        } else {
            int respCode = response.getCode().getValue();
            String info = response.getInfo().getValue();
            error = ServerErrorResponseException.build((int)respCode, (String)String.format("[ServerConnector]code %d, fail to query service %s from server(%s): %s", respCode, serviceKey, this.connection.getConnID(), info));
        }
        boolean svcDeleted = updateTask.notifyServerEvent(new ServerEvent(serviceEventKey, (Object)response, error));
        if (!svcDeleted) {
            updateTask.addUpdateTaskSet();
        }
    }

    public void onError(Throwable throwable) {
        this.exceptionCallback(new ValidResult(null, ErrorCode.NETWORK_ERROR, String.format("stream %s received error from server(%s), error is %s", this.getReqId(), this.connection.getConnID().toString(), throwable.getMessage())));
    }

    public void onCompleted() {
        this.exceptionCallback(new ValidResult(null, ErrorCode.NETWORK_ERROR, String.format("stream %s EOF by server(%s)", this.getReqId(), this.connection.getConnID().toString())));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void syncCloseExpireStream() {
        Object object = this.clientLock;
        synchronized (object) {
            this.closeExpireStream();
        }
    }

    private boolean closeExpireStream() {
        long lastRecvMs = this.lastRecvTimeMs.get();
        long nowMs = System.currentTimeMillis();
        long connIdleTime = lastRecvMs == 0L ? nowMs - this.createTimeMs : nowMs - lastRecvMs;
        if (connIdleTime >= this.connectionIdleTimeoutMs) {
            this.closeStream(true);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean checkAvailable(ServiceUpdateTask serviceUpdateTask) {
        if (this.isEndStream()) {
            return false;
        }
        Object object = this.clientLock;
        synchronized (object) {
            if (this.isEndStream()) {
                return false;
            }
            if (this.closeExpireStream()) {
                return false;
            }
            ServiceEventKey serviceEventKey = serviceUpdateTask.getServiceEventKey();
            ServiceUpdateTask lastUpdateTask = this.pendingTask.get(serviceEventKey);
            if (null != lastUpdateTask) {
                LOG.warn("[ServerConnector]pending task {} has been overwritten", (Object)lastUpdateTask);
            }
            this.pendingTask.put(serviceEventKey, serviceUpdateTask);
        }
        return true;
    }

    private static class ValidResult {
        final ServiceEventKey serviceEventKey;
        final ErrorCode errorCode;
        final String message;

        public ValidResult(ServiceEventKey serviceEventKey, ErrorCode errorCode, String message) {
            this.serviceEventKey = serviceEventKey;
            this.errorCode = errorCode;
            this.message = message;
        }

        public ServiceEventKey getServiceEventKey() {
            return this.serviceEventKey;
        }

        public ErrorCode getErrorCode() {
            return this.errorCode;
        }

        public String getMessage() {
            return this.message;
        }
    }
}

