/*
 * Decompiled with CFR 0.152.
 */
package com.staros.heartbeat;

import com.staros.util.AbstractServer;
import com.staros.util.Config;
import com.staros.util.LogUtils;
import com.staros.util.Utils;
import com.staros.worker.Worker;
import com.staros.worker.WorkerManager;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class HeartbeatManager
extends AbstractServer {
    private static final Logger LOG = LogManager.getLogger(HeartbeatManager.class);
    private final int heartbeatInterval;
    private final WorkerManager workerManager;
    private ScheduledThreadPoolExecutor executors;

    public HeartbeatManager(WorkerManager workerManager) {
        int interval = Config.WORKER_HEARTBEAT_INTERVAL_SEC;
        if (interval < 1 || interval > 60) {
            LOG.warn("worker heartbeat interval {} is not suitable, change it to default 10.", (Object)interval);
            interval = 10;
        }
        this.heartbeatInterval = interval;
        this.workerManager = workerManager;
    }

    @Override
    public void doStart() {
        this.executors = new ScheduledThreadPoolExecutor(1, Utils.namedThreadFactory("starmgr-heartbeatmgr"));
        this.executors.setMaximumPoolSize(1);
        this.executors.execute(this::runOnceHeartbeatCheck);
    }

    @Override
    public void doStop() {
        this.executors.shutdownNow();
        Utils.shutdownExecutorService(this.executors);
    }

    private void adjustExecutorThreads(int nWorkers) {
        int singleRound = Integer.max(1, this.heartbeatInterval / Config.WORKER_HEARTBEAT_GRPC_RPC_TIME_OUT_SEC);
        int minRange = nWorkers / singleRound + 1;
        int maxRange = minRange * 2;
        int nCoreThreads = this.executors.getCorePoolSize();
        int expected = (minRange + maxRange) / 2;
        if (nCoreThreads < minRange || nCoreThreads > maxRange) {
            LOG.info("Adjust heartbeatManager ThreadPool size from {} to {}", (Object)nCoreThreads, (Object)expected);
            Utils.adjustFixedThreadPoolExecutors(this.executors, expected);
        }
    }

    private void runOnceHeartbeatCheck() {
        if (!this.isRunning()) {
            return;
        }
        try {
            LOG.debug("running heartbeat once.");
            List<Long> allWorkerIds = this.workerManager.getAllWorkerIds();
            this.adjustExecutorThreads(allWorkerIds.size());
            for (long id : allWorkerIds) {
                Worker worker = this.workerManager.getWorker(id);
                if (worker == null) continue;
                this.executors.execute(() -> this.workerManager.doWorkerHeartbeat(id));
            }
        }
        catch (Exception exception) {
            LOG.warn("Fail to submit tasks to executor. error: ", (Throwable)exception);
        }
        try {
            this.executors.schedule(this::runOnceHeartbeatCheck, (long)this.heartbeatInterval, TimeUnit.SECONDS);
        }
        catch (Exception exception) {
            if (this.isRunning()) {
                LogUtils.fatal(LOG, "Fail to schedule next round of worker heartbeat check, error: {}", exception);
            }
            LOG.info("Fail to schedule next round of worker heartbeat check because heartbeat manager is shutting down.");
        }
    }
}

