/*
 * Decompiled with CFR 0.152.
 */
package com.staros.worker;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.staros.exception.StarException;
import com.staros.exception.WorkerNotHealthyStarException;
import com.staros.proto.AddShardRequest;
import com.staros.proto.RemoveShardRequest;
import com.staros.proto.ReplicationType;
import com.staros.proto.WarmupLevel;
import com.staros.proto.WorkerInfo;
import com.staros.proto.WorkerState;
import com.staros.starlet.StarletAgent;
import com.staros.starlet.StarletAgentFactory;
import com.staros.util.LockCloseable;
import com.staros.util.Text;
import com.staros.util.Writable;
import com.staros.worker.WorkerGroup;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Worker
implements Writable {
    private static final Logger LOG = LogManager.getLogger(Worker.class);
    private final String serviceId;
    private final long groupId;
    private final long workerId;
    private final String ipPort;
    private long startTime;
    private long lastSeenTime;
    private final AtomicReference<WorkerState> state;
    private Map<String, String> workerProperties;
    private final ReentrantReadWriteLock lock;
    private final AtomicLong numOfShards;
    private final StarletAgent starletAgent;
    private int heartbeatEpoch;
    private WorkerGroup workerGroup;

    public Worker(String serviceId, long groupId, long workerId, String ipPort) {
        this.serviceId = serviceId;
        this.groupId = groupId;
        this.workerId = workerId;
        this.ipPort = ipPort;
        this.startTime = 0L;
        this.lastSeenTime = 0L;
        this.state = new AtomicReference<WorkerState>(WorkerState.DOWN);
        this.workerProperties = ImmutableMap.of();
        this.lock = new ReentrantReadWriteLock();
        this.workerGroup = null;
        this.heartbeatEpoch = 0;
        this.numOfShards = new AtomicLong(0L);
        this.starletAgent = StarletAgentFactory.newStarletAgent();
        this.starletAgent.setWorker(this);
    }

    public void setWorkerGroup(WorkerGroup wg) {
        this.workerGroup = wg;
    }

    public boolean warmupEnabled() {
        return this.workerGroup.warmupEnabled();
    }

    public String getServiceId() {
        return this.serviceId;
    }

    public long getGroupId() {
        return this.groupId;
    }

    public long getWorkerId() {
        return this.workerId;
    }

    public String getIpPort() {
        return this.ipPort;
    }

    public WorkerState getState() {
        return this.state.get();
    }

    public Map<String, String> getProperties() {
        return this.workerProperties;
    }

    public long getNumOfShards() {
        long n = this.numOfShards.get();
        return n < 0L ? 0L : n;
    }

    public boolean setState(WorkerState state) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (this.state.get() == state) {
                boolean bl = false;
                return bl;
            }
            this.state.set(state);
            LOG.info("worker {} state set to {}.", (Object)this.workerId, this.state);
            boolean bl = true;
            return bl;
        }
    }

    public boolean isAlive() {
        return this.state.get() == WorkerState.ON;
    }

    public boolean isShutdown() {
        return this.state.get() == WorkerState.SHUTTING_DOWN;
    }

    public void updateLastSeenTime(long lastSeenTime) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (lastSeenTime > this.lastSeenTime) {
                this.lastSeenTime = lastSeenTime;
            }
        }
    }

    public long getLastSeenTime() {
        return this.lastSeenTime;
    }

    public ReplicationType getReplicationType() {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            Preconditions.checkState((this.workerGroup != null ? 1 : 0) != 0);
            ReplicationType replicationType = this.workerGroup.getReplicationType();
            return replicationType;
        }
    }

    public WarmupLevel getWarmupLevel() {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            Preconditions.checkState((this.workerGroup != null ? 1 : 0) != 0);
            WarmupLevel warmupLevel = this.workerGroup.getWarmupLevel();
            return warmupLevel;
        }
    }

    public Pair<Boolean, Boolean> updateInfo(long sTime, Map<String, String> workerProps, long numOfShards) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            boolean needPersist = false;
            if (workerProps != null && !this.workerProperties.equals(workerProps)) {
                LOG.debug("worker {} properties changed, prev:{}, now:{}.", (Object)this.workerId, this.workerProperties, workerProps);
                this.workerProperties = ImmutableMap.copyOf(workerProps);
                needPersist = true;
            }
            boolean restarted = false;
            if (sTime == 0L || sTime < this.startTime) {
                LOG.info("Detect invalid start time of worker {}, reported startTime:{}, latest startTime:{}!", (Object)this.workerId, (Object)sTime, (Object)this.startTime);
            } else if (sTime > this.startTime) {
                if (this.startTime != 0L) {
                    restarted = true;
                    SimpleDateFormat fmt = new SimpleDateFormat("MM-dd HH:mm:ss.SSS");
                    LOG.info("Detect worker {} start at {}, previous start time: {}.", (Object)this.workerId, (Object)fmt.format(new Date(sTime)), (Object)fmt.format(new Date(this.startTime)));
                }
                this.startTime = sTime;
            }
            ++this.heartbeatEpoch;
            this.numOfShards.set(numOfShards);
            needPersist = needPersist || restarted;
            Pair pair = Pair.of((Object)restarted, (Object)needPersist);
            return pair;
        }
    }

    private void restoreState(long startTime, WorkerState state, Map<String, String> workerProps, long lastSeenTime) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            this.startTime = startTime;
            this.state.set(state);
            this.workerProperties = workerProps;
            this.lastSeenTime = lastSeenTime;
        }
    }

    public WorkerInfo toProtobuf() {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            WorkerInfo workerInfo = WorkerInfo.newBuilder().setServiceId(this.serviceId).setGroupId(this.groupId).setWorkerId(this.workerId).setIpPort(this.ipPort).setWorkerState(this.getState()).putAllWorkerProperties(this.workerProperties).setStartTime(this.startTime).setTabletNum(this.numOfShards.get()).build();
            return workerInfo;
        }
    }

    public static Worker fromProtobuf(WorkerInfo info) {
        String serviceId = info.getServiceId();
        long groupId = info.getGroupId();
        long workerId = info.getWorkerId();
        String ipPort = info.getIpPort();
        WorkerState state = info.getWorkerState();
        long startTime = info.getStartTime();
        Map workerProps = info.getWorkerPropertiesMap();
        Worker worker = new Worker(serviceId, groupId, workerId, ipPort);
        worker.restoreState(startTime, state, workerProps, startTime);
        return worker;
    }

    public void write(DataOutput out) throws IOException {
        byte[] bytes = this.toProtobuf().toByteArray();
        Text.writeBytes((DataOutput)out, (byte[])bytes);
    }

    public static Worker read(DataInput in) throws IOException {
        byte[] bytes = Text.readBytes((DataInput)in);
        WorkerInfo info = WorkerInfo.parseFrom((byte[])bytes);
        return Worker.fromProtobuf(info);
    }

    public Pair<Boolean, Boolean> heartbeat() {
        Pair<Boolean, Boolean> pair = this.starletAgent.heartbeat();
        if (((Boolean)pair.getKey()).booleanValue()) {
            this.updateLastSeenTime(System.currentTimeMillis());
        }
        return pair;
    }

    public boolean match(Worker worker) {
        if (worker == null) {
            return false;
        }
        if (this.workerId == worker.workerId) {
            Preconditions.checkState((boolean)this.serviceId.equals(worker.serviceId));
            Preconditions.checkState((this.groupId == worker.groupId ? 1 : 0) != 0);
            return true;
        }
        return false;
    }

    public boolean update(Worker worker) {
        if (this.workerId != worker.workerId) {
            return false;
        }
        Preconditions.checkState((this.groupId == worker.groupId ? 1 : 0) != 0);
        Preconditions.checkState((boolean)this.serviceId.equals(worker.serviceId));
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            this.startTime = worker.startTime;
            this.state.set(worker.state.get());
            this.workerProperties = ImmutableMap.copyOf(worker.workerProperties);
            boolean bl = true;
            return bl;
        }
    }

    public void addShard(AddShardRequest request) throws StarException {
        if (!this.isAlive()) {
            throw new WorkerNotHealthyStarException(String.format("Worker not healthy when try to add shard replica to serviceId: %s, workerId: %s, worker stat: %s", this.serviceId, this.workerId, this.state.get()));
        }
        long epoch = this.heartbeatEpoch;
        this.starletAgent.addShard(request);
        if (epoch == (long)this.heartbeatEpoch) {
            this.numOfShards.addAndGet(request.getShardInfoCount());
        }
    }

    public void updateShard(AddShardRequest request) throws StarException {
        if (!this.isAlive()) {
            throw new WorkerNotHealthyStarException(String.format("Worker not healthy when try to update shard replica to serviceId: %s, workerId: %s, worker stat: %s", this.serviceId, this.workerId, this.state.get()));
        }
        this.starletAgent.addShard(request);
    }

    public void removeShard(RemoveShardRequest request) throws StarException {
        if (!this.isAlive()) {
            throw new WorkerNotHealthyStarException(String.format("Worker not healthy when try to remove shard replica, serviceId: %s, workerId: %s, worker stat: %s", this.serviceId, this.workerId, this.state.get()));
        }
        long epoch = this.heartbeatEpoch;
        this.starletAgent.removeShard(request);
        if (epoch == (long)this.heartbeatEpoch) {
            this.numOfShards.getAndAdd(-request.getShardIdsCount());
        }
    }

    public void decommission() {
        this.starletAgent.disconnectWorker();
    }

    public String toString() {
        return String.format("[Worker] id:%d, group:%d, service:%s", this.workerId, this.groupId, this.serviceId);
    }
}

