/*
 * Decompiled with CFR 0.152.
 */
package com.staros.starlet;

import com.google.common.annotations.VisibleForTesting;
import com.staros.exception.StarException;
import com.staros.exception.WorkerNotHealthyStarException;
import com.staros.proto.AddShardRequest;
import com.staros.proto.AddShardResponse;
import com.staros.proto.RemoveShardRequest;
import com.staros.proto.RemoveShardResponse;
import com.staros.proto.StarStatus;
import com.staros.proto.StarletGrpc;
import com.staros.proto.StarletHeartbeatRequest;
import com.staros.proto.StarletHeartbeatResponse;
import com.staros.proto.StatusCode;
import com.staros.proto.WorkerState;
import com.staros.util.Config;
import com.staros.util.StatusFactory;
import com.staros.worker.Worker;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class StarletAgent {
    private static final Logger LOG = LogManager.getLogger(StarletAgent.class);
    private static final int GRPC_MAX_RETRY_TIMES = 3;
    private final String starMgrAddress = String.format("%s:%s", Config.STARMGR_IP, Config.STARMGR_RPC_PORT);
    private AtomicReference<StarletGrpc.StarletBlockingStub> blockingStub = new AtomicReference<Object>(null);
    private ManagedChannel channel = null;
    private Worker worker = null;
    private int heartbeatRetryCount = 0;

    public void setWorker(Worker w) {
        this.worker = w;
    }

    public void disconnectWorker() {
        if (this.channel != null) {
            this.channel.shutdown();
            try {
                this.channel.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (Exception exception) {
                LOG.info("Got exception while waiting channel shutdown.", (Throwable)exception);
            }
            this.channel = null;
        }
    }

    public Pair<Boolean, Boolean> heartbeat() {
        StarStatus status = null;
        try {
            StarletGrpc.StarletBlockingStub stub = this.prepareBlockingStub();
            StarletHeartbeatRequest request = StarletHeartbeatRequest.newBuilder().setStarMgrLeader(this.starMgrAddress).setServiceId(this.worker.getServiceId()).setWorkerGroupId(this.worker.getGroupId()).setWorkerId(this.worker.getWorkerId()).build();
            StarletHeartbeatResponse response = ((StarletGrpc.StarletBlockingStub)stub.withDeadlineAfter((long)Config.WORKER_HEARTBEAT_GRPC_RPC_TIME_OUT_SEC, TimeUnit.SECONDS)).starletHeartbeat(request);
            status = response.getStatus();
        }
        catch (StatusRuntimeException exception) {
            LOG.warn("caught GRPC exception when sending heartbeat to worker {}, {}.", (Object)this.worker.getIpPort(), (Object)exception);
            status = this.handleGrpcException(exception);
        }
        catch (Exception e) {
            LOG.warn("caught generic exception when sending heartbeat to worker {}, {}.", (Object)this.worker.getIpPort(), (Object)e);
            status = StatusFactory.getStatus((StatusCode)StatusCode.INTERNAL, (String)e.getMessage());
        }
        boolean changed = false;
        if (status.getStatusCode() != StatusCode.OK) {
            if (status.getStatusCode() == StatusCode.SHUT_DOWN) {
                this.worker.setState(WorkerState.SHUTTING_DOWN);
                this.heartbeatRetryCount = 0;
                changed = true;
            } else if (this.heartbeatRetryCount < Config.WORKER_HEARTBEAT_RETRY_COUNT) {
                ++this.heartbeatRetryCount;
            } else {
                changed = this.worker.setState(WorkerState.DOWN);
            }
            LOG.warn("sending heartbeat to worker {} failed, {}:{}.", (Object)this.worker.getIpPort(), (Object)status.getStatusCode(), (Object)status.getErrorMsg());
            return Pair.of((Object)false, (Object)changed);
        }
        LOG.debug("sending heartbeat to worker {} succeed.", (Object)this.worker.getIpPort());
        changed = this.worker.setState(WorkerState.ON);
        this.heartbeatRetryCount = 0;
        return Pair.of((Object)true, (Object)changed);
    }

    public void addShard(AddShardRequest request) throws StarException {
        for (int retryCount = 1; retryCount <= 3; ++retryCount) {
            try {
                StarletGrpc.StarletBlockingStub stub = this.prepareBlockingStub();
                AddShardResponse response = ((StarletGrpc.StarletBlockingStub)stub.withDeadlineAfter((long)Config.GRPC_RPC_TIME_OUT_SEC, TimeUnit.SECONDS)).addShard(request);
                StarStatus status = response.getStatus();
                if (status.getStatusCode() != StatusCode.OK) {
                    LOG.warn("add shard to worker {} failed, {}:{}.", (Object)this.worker.getIpPort(), (Object)status.getStatusCode(), (Object)status.getErrorMsg());
                    throw new WorkerNotHealthyStarException("failed to add shard to worker {}", new Object[]{this.worker.getIpPort()});
                }
                LOG.debug("add shard to worker {} succeed.", (Object)this.worker.getIpPort());
                return;
            }
            catch (StatusRuntimeException e) {
                if (!this.isGrpcExceptionRetryable(e) || retryCount >= 3) {
                    LOG.warn("caught GRPC exception when adding shard to worker {}, {}.", (Object)this.worker.getIpPort(), (Object)e.getMessage());
                    throw new WorkerNotHealthyStarException("failed to add shard to worker {}", new Object[]{this.worker.getIpPort()});
                }
                LOG.debug("caught GRPC exception when adding shard to worker {}, retry {} times", (Object)this.worker.getIpPort(), (Object)retryCount);
                continue;
            }
            catch (Exception e) {
                LOG.warn("caught exception when adding shard to worker {}", (Object)this.worker.getIpPort(), (Object)e);
                throw new WorkerNotHealthyStarException("failed to add shard to worker {}", new Object[]{this.worker.getIpPort()});
            }
        }
    }

    public void removeShard(RemoveShardRequest request) throws StarException {
        for (int retryCount = 1; retryCount <= 3; ++retryCount) {
            try {
                StarletGrpc.StarletBlockingStub stub = this.prepareBlockingStub();
                RemoveShardResponse response = ((StarletGrpc.StarletBlockingStub)stub.withDeadlineAfter((long)Config.GRPC_RPC_TIME_OUT_SEC, TimeUnit.SECONDS)).removeShard(request);
                StarStatus status = response.getStatus();
                if (status.getStatusCode() != StatusCode.OK) {
                    LOG.warn("remove shard from worker {} failed, {}:{}.", (Object)this.worker.getIpPort(), (Object)status.getStatusCode(), (Object)status.getErrorMsg());
                    throw new WorkerNotHealthyStarException("failed to remove shard from worker {}", new Object[]{this.worker.getIpPort()});
                }
                LOG.debug("remove shard from worker {} succeed.", (Object)this.worker.getIpPort());
                return;
            }
            catch (StatusRuntimeException e) {
                if (!this.isGrpcExceptionRetryable(e) || retryCount >= 3) {
                    LOG.warn("caught GRPC exception when removing shard from worker {}, {}.", (Object)this.worker.getIpPort(), (Object)e.getMessage());
                    throw new WorkerNotHealthyStarException("failed to remove shard from worker {}", new Object[]{this.worker.getIpPort()});
                }
                LOG.debug("caught GRPC exception when removing shard from worker {}, retry {} times", (Object)this.worker.getIpPort(), (Object)retryCount);
                continue;
            }
            catch (Exception e) {
                LOG.warn("caught exception when removing shard from worker {}", (Object)this.worker.getIpPort(), (Object)e);
                throw new WorkerNotHealthyStarException("failed to remove shard from worker {}", new Object[]{this.worker.getIpPort()});
            }
        }
    }

    private boolean isGrpcExceptionRetryable(StatusRuntimeException exception) {
        return exception.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED || exception.getStatus().getCode() == Status.Code.UNAVAILABLE;
    }

    private StarStatus handleGrpcException(StatusRuntimeException exception) {
        switch (exception.getStatus().getCode()) {
            case ABORTED: 
            case CANCELLED: 
            case DEADLINE_EXCEEDED: 
            case INTERNAL: 
            case RESOURCE_EXHAUSTED: 
            case UNAVAILABLE: 
            case UNIMPLEMENTED: 
            case UNKNOWN: {
                this.blockingStub.set(null);
                break;
            }
        }
        return StatusFactory.getStatus((StatusCode)StatusCode.GRPC, (String)exception.getMessage());
    }

    @VisibleForTesting
    protected StarletGrpc.StarletBlockingStub prepareBlockingStub() {
        StarletGrpc.StarletBlockingStub stub = this.blockingStub.get();
        if (stub == null) {
            return this.buildGrpcChannelAndStub();
        }
        return stub;
    }

    private StarletGrpc.StarletBlockingStub buildGrpcChannelAndStub() {
        ManagedChannel oldChannel = this.channel;
        this.channel = ManagedChannelBuilder.forTarget((String)this.worker.getIpPort()).maxInboundMessageSize(Config.GRPC_CHANNEL_MAX_MESSAGE_SIZE).usePlaintext().build();
        StarletGrpc.StarletBlockingStub stub = StarletGrpc.newBlockingStub((Channel)this.channel);
        this.blockingStub.set(stub);
        if (oldChannel != null) {
            oldChannel.shutdown();
        }
        return stub;
    }
}

