/*
 * Decompiled with CFR 0.152.
 */
package io.littlehorse.sdk.worker.internal;

import io.grpc.stub.StreamObserver;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LHHostInfo;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerRequest;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerResponse;
import io.littlehorse.sdk.common.proto.TaskDef;
import io.littlehorse.sdk.worker.internal.LHLivenessController;
import io.littlehorse.sdk.worker.internal.PollThread;
import io.littlehorse.sdk.worker.internal.PollThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RebalanceThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(RebalanceThread.class);
    private final LittleHorseGrpc.LittleHorseStub bootstrapStub;
    private final String taskWorkerId;
    private final TaskDef taskDef;
    private final HeartBeatCallback heartBeatCallback = new HeartBeatCallback();
    private final LHConfig config;
    final Map<LHHostInfo, List<PollThread>> runningConnections = new ConcurrentHashMap<LHHostInfo, List<PollThread>>();
    private final LHLivenessController livenessController;
    private final long heartbeatIntervalMs;
    private final PollThreadFactory pollThreadFactory;

    public RebalanceThread(LittleHorseGrpc.LittleHorseStub bootstrapStub, String taskWorkerId, TaskDef taskDef, LHConfig config, LHLivenessController livenessController, long heartbeatIntervalMs, PollThreadFactory pollThreadFactory) {
        this.bootstrapStub = bootstrapStub;
        this.taskWorkerId = taskWorkerId;
        this.taskDef = taskDef;
        this.pollThreadFactory = pollThreadFactory;
        this.config = config;
        this.livenessController = livenessController;
        this.heartbeatIntervalMs = heartbeatIntervalMs;
    }

    @Override
    public void run() {
        while (this.livenessController.keepWorkerRunning()) {
            this.doHeartBeat();
            this.waitForInterval();
        }
    }

    public void close() {
        for (List<PollThread> pThreadList : this.runningConnections.values()) {
            for (PollThread pThread : pThreadList) {
                pThread.interrupt();
                pThread.close();
            }
        }
    }

    public void doHeartBeat() {
        this.bootstrapStub.registerTaskWorker(RegisterTaskWorkerRequest.newBuilder().setTaskDefId(this.taskDef.getId()).setTaskWorkerId(this.taskWorkerId).build(), this.heartBeatCallback);
    }

    private void waitForInterval() {
        try {
            Thread.sleep(this.heartbeatIntervalMs);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private final class HeartBeatCallback
    implements StreamObserver<RegisterTaskWorkerResponse> {
        private HeartBeatCallback() {
        }

        public void onNext(RegisterTaskWorkerResponse response) {
            RebalanceThread.this.livenessController.notifySuccessCall(response);
            List<LHHostInfo> availableHosts = response.getYourHostsList();
            this.maybeCleanupDanglingNonRunningThreads(availableHosts);
            this.cleanupDanglingThreadsFromAvailableHosts(availableHosts);
            for (LHHostInfo lhHostInfo : availableHosts) {
                if (RebalanceThread.this.runningConnections.containsKey(lhHostInfo)) continue;
                ArrayList<PollThread> connections = new ArrayList<PollThread>();
                for (int i = 0; i < RebalanceThread.this.config.getWorkerThreads(); ++i) {
                    String threadName = String.format("lh-poll-%s", i);
                    PollThread connection = RebalanceThread.this.pollThreadFactory.create(threadName, lhHostInfo);
                    connection.start();
                    connections.add(connection);
                }
                RebalanceThread.this.runningConnections.put(lhHostInfo, connections);
            }
        }

        private void cleanupDanglingThreadsFromAvailableHosts(List<LHHostInfo> availableHosts) {
            ArrayList<LHHostInfo> toBeRemoved = new ArrayList<LHHostInfo>();
            for (LHHostInfo lhHostInfo : RebalanceThread.this.runningConnections.keySet()) {
                if (availableHosts.contains(lhHostInfo)) continue;
                toBeRemoved.add(lhHostInfo);
            }
            for (LHHostInfo toRemove : toBeRemoved) {
                List<PollThread> pollThreads = RebalanceThread.this.runningConnections.get(toRemove);
                for (PollThread pollThread : pollThreads) {
                    pollThread.interrupt();
                    pollThread.close();
                }
                RebalanceThread.this.runningConnections.remove(toRemove);
            }
        }

        private void maybeCleanupDanglingNonRunningThreads(List<LHHostInfo> availableHosts) {
            for (LHHostInfo hostInfo : RebalanceThread.this.runningConnections.keySet()) {
                List<PollThread> currentThreads = RebalanceThread.this.runningConnections.get(hostInfo);
                ArrayList<PollThread> runningThreads = new ArrayList<PollThread>();
                for (PollThread currentThread : currentThreads) {
                    if (currentThread.isRunning()) {
                        runningThreads.add(currentThread);
                        continue;
                    }
                    currentThread.interrupt();
                    currentThread.close();
                }
                if (availableHosts.contains(hostInfo)) {
                    int numberMissingPollThreads = RebalanceThread.this.config.getWorkerThreads() - runningThreads.size();
                    for (int i = 0; i < numberMissingPollThreads; ++i) {
                        String threadName = String.format("lh-poll-%s", runningThreads.size() + 1);
                        PollThread pollThread = RebalanceThread.this.pollThreadFactory.create(threadName, hostInfo);
                        pollThread.start();
                        runningThreads.add(pollThread);
                    }
                }
                RebalanceThread.this.runningConnections.put(hostInfo, runningThreads);
            }
        }

        public void onError(Throwable t) {
            RebalanceThread.this.livenessController.notifyWorkerFailure();
        }

        public void onCompleted() {
        }
    }
}

