/*
 * Decompiled with CFR 0.152.
 */
package com.staros.worker;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import com.staros.exception.AlreadyExistsStarException;
import com.staros.exception.InvalidArgumentStarException;
import com.staros.exception.NotExistStarException;
import com.staros.exception.StarException;
import com.staros.journal.DummyJournalSystem;
import com.staros.journal.Journal;
import com.staros.journal.JournalSystem;
import com.staros.journal.StarMgrJournal;
import com.staros.proto.SectionType;
import com.staros.proto.UpdateWorkerGroupInfo;
import com.staros.proto.WorkerGroupDetailInfo;
import com.staros.proto.WorkerGroupSpec;
import com.staros.proto.WorkerInfo;
import com.staros.proto.WorkerManagerImageMetaFooter;
import com.staros.proto.WorkerManagerImageMetaHeader;
import com.staros.proto.WorkerManagerServiceWorkerGroup;
import com.staros.proto.WorkerManagerWorkers;
import com.staros.section.Section;
import com.staros.section.SectionReader;
import com.staros.section.SectionWriter;
import com.staros.util.Config;
import com.staros.util.IdGenerator;
import com.staros.util.LockCloseable;
import com.staros.util.LogUtils;
import com.staros.util.Utils;
import com.staros.worker.ResourceManager;
import com.staros.worker.ResourceManagerFactory;
import com.staros.worker.Worker;
import com.staros.worker.WorkerGroup;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class WorkerManager {
    private static final Logger LOG = LogManager.getLogger(WorkerManager.class);
    private static final long NO_WORKER_GROUP_ID = -1L;
    private final Map<Long, Worker> workers = new HashMap<Long, Worker>();
    private final Map<String, ServiceWorkerGroup> serviceWorkerGroups = new HashMap<String, ServiceWorkerGroup>();
    private final JournalSystem journalSystem;
    private final IdGenerator idGenerator;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ResourceManager resourceManager;
    private ExecutorService heartbeatExecutor;

    public WorkerManager(JournalSystem journalSystem, IdGenerator idGenerator) {
        this.journalSystem = journalSystem;
        this.idGenerator = idGenerator;
        this.resourceManager = ResourceManagerFactory.createResourceManager(this);
        this.heartbeatExecutor = null;
    }

    public void bootstrapService(String serviceId) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (this.serviceWorkerGroups.containsKey(serviceId)) {
                return;
            }
            this.serviceWorkerGroups.put(serviceId, new ServiceWorkerGroup(serviceId));
        }
    }

    public long createWorkerGroup(String serviceId, String owner, WorkerGroupSpec spec, Map<String, String> labels, Map<String, String> properties) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("ServiceId:{} not exist!", new Object[]{serviceId});
            }
            if (!this.resourceManager.isValidSpec(spec)) {
                throw new InvalidArgumentStarException("Invalid WorkerGroupSpec:{}", new Object[]{spec.getSize()});
            }
            long groupId = this.idGenerator.getNextId();
            Preconditions.checkState((!this.serviceWorkerGroups.get(serviceId).containsKey(groupId) ? 1 : 0) != 0);
            WorkerGroup group = new WorkerGroup(serviceId, groupId, owner, spec, labels, properties);
            Journal journal = StarMgrJournal.logCreateWorkerGroup(group);
            this.journalSystem.write(journal);
            this.serviceWorkerGroups.get(serviceId).put(groupId, group);
            this.submitAsyncTask(() -> this.resourceManager.provisionResource(serviceId, groupId, spec, owner));
            long l = groupId;
            return l;
        }
    }

    public long addWorker(String serviceId, long groupId, String ipPort) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            WorkerGroup group = this.getWorkerGroupInternalNoLock(serviceId, groupId);
            if (group == null) {
                throw new NotExistStarException("worker group {} or service {} not exist.", new Object[]{groupId, serviceId});
            }
            if (this.workers.values().stream().anyMatch(x -> x.getIpPort().equals(ipPort))) {
                throw new AlreadyExistsStarException("worker address {} already exist.", new Object[]{ipPort});
            }
            long l = this.addWorkerInternal(serviceId, groupId, ipPort);
            return l;
        }
    }

    public List<Long> addWorkers(String serviceId, long groupId, List<String> ipPorts) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            WorkerGroup group = this.getWorkerGroupInternalNoLock(serviceId, groupId);
            if (group == null) {
                throw new NotExistStarException("worker group {} or service {} not exist.", new Object[]{groupId, serviceId});
            }
            if (this.workers.values().stream().anyMatch(x -> ipPorts.contains(x.getIpPort()))) {
                throw new AlreadyExistsStarException("duplicated worker address detected.");
            }
            ArrayList<Long> result = new ArrayList<Long>();
            for (String ipPort : ipPorts) {
                result.add(this.addWorkerInternal(serviceId, groupId, ipPort));
            }
            ArrayList<Long> arrayList = result;
            return arrayList;
        }
    }

    private long addWorkerInternal(String serviceId, long groupId, String ipPort) throws StarException {
        WorkerGroup group = this.getWorkerGroupInternalNoLock(serviceId, groupId);
        Preconditions.checkState((group != null ? 1 : 0) != 0);
        long workerId = this.idGenerator.getNextId();
        Preconditions.checkState((!this.workers.containsKey(workerId) ? 1 : 0) != 0);
        Worker worker = new Worker(serviceId, groupId, workerId, ipPort);
        Journal journal = StarMgrJournal.logAddWorker(worker);
        this.journalSystem.write(journal);
        this.workers.put(workerId, worker);
        group.addWorker(worker);
        LOG.info("worker {} added to group {} in service {}.", (Object)workerId, (Object)groupId, (Object)serviceId);
        this.submitAsyncTask(() -> this.doWorkerHeartbeat(workerId));
        return workerId;
    }

    public void doWorkerHeartbeat(long workerId) {
        Pair<Boolean, Boolean> pair;
        Worker worker = this.getWorker(workerId);
        if (worker != null && ((Boolean)(pair = worker.heartbeat()).getValue()).booleanValue()) {
            this.persistWorkerInfo(workerId);
        }
    }

    private void persistWorkerInfo(long workerId) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            Worker worker = this.workers.get(workerId);
            if (worker != null) {
                try {
                    Journal journal = StarMgrJournal.logUpdateWorker(worker.getServiceId(), Collections.nCopies(1, worker));
                    this.journalSystem.write(journal);
                }
                catch (StarException e) {
                    LOG.warn("fail to persist worker {} info.", (Object)workerId);
                }
            }
        }
    }

    public void removeWorker(String serviceId, long groupId, long workerId) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            WorkerGroup group = this.getWorkerGroupInternalNoLock(serviceId, groupId);
            if (group == null) {
                throw new NotExistStarException("worker group {} or service {} not exist.", new Object[]{groupId, serviceId});
            }
            if (!this.workers.containsKey(workerId)) {
                throw new NotExistStarException("worker:{} not exist in worker group.", new Object[]{workerId});
            }
            Worker worker = this.workers.get(workerId);
            if (!worker.getServiceId().equals(serviceId) || worker.getGroupId() != groupId) {
                throw new InvalidArgumentStarException("inconsistent group or service info of worker:{}", new Object[]{workerId});
            }
            Journal journal = StarMgrJournal.logRemoveWorker(serviceId, groupId, workerId);
            this.journalSystem.write(journal);
            group.removeWorker(worker);
            this.deleteWorkerInternal(workerId);
            LOG.info("worker {} removed from group {} in service {}.", (Object)workerId, (Object)groupId, (Object)serviceId);
        }
    }

    public WorkerInfo getWorkerInfo(long workerId) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            if (!this.workers.containsKey(workerId)) {
                throw new NotExistStarException("worker:{} not exist.", new Object[]{workerId});
            }
            WorkerInfo workerInfo = this.workers.get(workerId).toProtobuf();
            return workerInfo;
        }
    }

    /*
     * Exception decompiling
     */
    public WorkerInfo getWorkerInfo(String ipPort) throws StarException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [10[WHILELOOP]], but top level block is 3[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public WorkerGroup getDefaultWorkerGroup(String serviceId) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("service:{} has no default worker group!", new Object[]{serviceId});
            }
            WorkerGroup group = this.serviceWorkerGroups.get(serviceId).getDefaultWorkerGroup();
            if (group == null) {
                throw new NotExistStarException("service:{} has no default worker group!", new Object[]{serviceId});
            }
            WorkerGroup workerGroup = group;
            return workerGroup;
        }
    }

    public WorkerGroup getWorkerGroup(String serviceId, long groupId) throws StarException {
        WorkerGroup group = this.getWorkerGroupNoException(serviceId, groupId);
        if (group == null) {
            throw new NotExistStarException("worker group {} or service {} not exist.", new Object[]{groupId, serviceId});
        }
        return group;
    }

    public WorkerGroup getWorkerGroupNoException(String serviceId, long groupId) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            WorkerGroup workerGroup = this.getWorkerGroupInternalNoLock(serviceId, groupId);
            return workerGroup;
        }
    }

    public Worker getWorker(long workerId) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            Worker worker = this.workers.get(workerId);
            return worker;
        }
    }

    public List<Long> getAllWorkerIds() {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            ArrayList<Long> arrayList = new ArrayList<Long>(this.workers.keySet());
            return arrayList;
        }
    }

    private WorkerGroup getWorkerGroupInternalNoLock(String serviceId, long groupId) {
        ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
        if (swg == null) {
            return null;
        }
        return swg.get(groupId);
    }

    public boolean processWorkerHeartbeat(String serviceId, long workerId, long startTime, long numOfShards, Map<String, String> workerProperties, long lastSeenTime) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.workers.containsKey(workerId)) {
                throw new NotExistStarException("worker:{} not exist.", new Object[]{workerId});
            }
            Worker worker = this.workers.get(workerId);
            if (!worker.getServiceId().equals(serviceId)) {
                throw new InvalidArgumentStarException("worker:{} does not belong to service:{}.", new Object[]{workerId, serviceId});
            }
            worker.updateLastSeenTime(lastSeenTime);
            Pair<Boolean, Boolean> pair = worker.updateInfo(startTime, workerProperties, numOfShards);
            if (((Boolean)pair.getValue()).booleanValue()) {
                this.persistWorkerInfo(workerId);
            }
            boolean bl = (Boolean)pair.getKey();
            return bl;
        }
    }

    public void replayCreateWorkerGroup(String serviceId, WorkerGroup group) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                LogUtils.fatal(LOG, "service:{} not exist, should not happen!", serviceId);
            }
            long groupId = group.getGroupId();
            if (this.serviceWorkerGroups.get(serviceId).containsKey(groupId)) {
                LogUtils.fatal(LOG, "workerGroup:{} already exist when replay add worker group, should not happen!", groupId);
            }
            this.serviceWorkerGroups.get(serviceId).put(groupId, group);
        }
    }

    public void replayAddWorker(String serviceId, Worker worker) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            WorkerGroup group = this.getWorkerGroupInternalNoLock(serviceId, worker.getGroupId());
            if (group == null) {
                LogUtils.fatal(LOG, "service:{} or group:{} not exist, should not happen!", serviceId, worker.getGroupId());
                return;
            }
            if (this.workers.containsKey(worker.getWorkerId())) {
                LogUtils.fatal(LOG, "worker {} already exist when replay add worker, should not happen!", worker.getWorkerId());
                return;
            }
            this.workers.put(worker.getWorkerId(), worker);
            group.addWorker(worker);
        }
    }

    public void replayRemoveWorker(String serviceId, long groupId, long workerId) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.workers.containsKey(workerId)) {
                LOG.warn("worker:{} should exist before replay remove operation. Ignore it for now!", (Object)workerId);
                return;
            }
            Worker worker = this.workers.get(workerId);
            Preconditions.checkState((boolean)worker.getServiceId().equals(serviceId), (Object)String.format("worker:%d service id mismatch!", workerId));
            Preconditions.checkState((worker.getGroupId() == groupId ? 1 : 0) != 0, (Object)String.format("worker:%d group id mismatch!", workerId));
            this.serviceWorkerGroups.get(serviceId).get(groupId).removeWorker(worker);
            this.workers.remove(workerId);
        }
    }

    public void replayUpdateWorker(String serviceId, List<Worker> workers) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                LogUtils.fatal(LOG, "service:{} not exist when replayUpdateWorker, should never happen!", serviceId);
                return;
            }
            for (Worker worker : workers) {
                if (this.workers.containsKey(worker.getWorkerId()) && this.getWorkerGroupInternalNoLock(serviceId, worker.getGroupId()) != null) continue;
                LogUtils.fatal(LOG, "worker {} not exist when replay update worker, should not happen!", worker.getWorkerId());
                return;
            }
            for (Worker worker : workers) {
                this.workers.get(worker.getWorkerId()).update(worker);
            }
        }
    }

    public int getWorkerCount() {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            int n = this.workers.size();
            return n;
        }
    }

    public List<Long> getAllWorkerGroupIds(String serviceId) {
        ArrayList<Long> workerGroupIds = new ArrayList<Long>();
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            if (this.serviceWorkerGroups.containsKey(serviceId)) {
                workerGroupIds.addAll(this.serviceWorkerGroups.get(serviceId).keySet());
            }
        }
        return workerGroupIds;
    }

    public List<WorkerGroupDetailInfo> listWorkerGroupsById(String serviceId, List<Long> groupIds, boolean includeWorkersInfo) {
        if (groupIds.isEmpty()) {
            throw new InvalidArgumentStarException("Empty worker group id provided!");
        }
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("service:{} not registered yet!", new Object[]{serviceId});
            }
            ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
            ArrayList<WorkerGroupDetailInfo> result = new ArrayList<WorkerGroupDetailInfo>();
            Object object = groupIds.iterator();
            while (object.hasNext()) {
                long id = object.next();
                if (swg.containsKey(id)) {
                    result.add(this.buildWorkerGroupDetailInfo(swg.get(id), includeWorkersInfo));
                    continue;
                }
                throw new NotExistStarException("group:{} not exist in service:{}", new Object[]{id, serviceId});
            }
            object = result;
            return object;
        }
    }

    public List<WorkerGroupDetailInfo> listWorkerGroups(String serviceId, Map<String, String> filterLabels, boolean includeWorkersInfo) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("service:{} not registered yet!", new Object[]{serviceId});
            }
            ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
            List<Object> result = new ArrayList();
            if (filterLabels.isEmpty()) {
                result = swg.values().stream().map(x -> this.buildWorkerGroupDetailInfo((WorkerGroup)x, includeWorkersInfo)).collect(Collectors.toList());
            } else {
                for (WorkerGroup group : swg.values()) {
                    if (!WorkerManager.allLabelMatching(group.getLabels(), filterLabels)) continue;
                    result.add(this.buildWorkerGroupDetailInfo(group, includeWorkersInfo));
                }
            }
            if (result.isEmpty()) {
                throw new NotExistStarException("No worker group matching criteria in service:{}", new Object[]{serviceId});
            }
            List<Object> list = result;
            return list;
        }
    }

    private WorkerGroupDetailInfo buildWorkerGroupDetailInfo(WorkerGroup group, boolean includeWorkersInfo) {
        WorkerGroupDetailInfo.Builder builder = WorkerGroupDetailInfo.newBuilder().mergeFrom(group.toProtobuf());
        if (includeWorkersInfo) {
            builder.addAllWorkersInfo((Iterable)group.getAllWorkerIds(false).stream().map(this.workers::get).map(Worker::toProtobuf).collect(Collectors.toList()));
        }
        return builder.build();
    }

    private static boolean allLabelMatching(Map<String, String> sourceLabels, Map<String, String> expectLabels) {
        return expectLabels.entrySet().stream().allMatch(x -> Objects.equals(sourceLabels.get(x.getKey()), x.getValue()));
    }

    public void updateWorkerGroup(String serviceId, long groupId, WorkerGroupSpec spec, Map<String, String> labels, Map<String, String> props) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("service:{} not registered yet!", new Object[]{serviceId});
            }
            ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
            if (!swg.containsKey(groupId)) {
                throw new NotExistStarException("worker group:{} not exist in service:{}!", new Object[]{groupId, serviceId});
            }
            if (spec != null && !this.resourceManager.isValidSpec(spec)) {
                throw new InvalidArgumentStarException("Invalid WorkerGroupSpec:{}", new Object[]{spec.getSize()});
            }
            UpdateWorkerGroupInfo.Builder builder = UpdateWorkerGroupInfo.newBuilder().setGroupId(groupId);
            if (spec != null) {
                builder.setSpec(spec);
            }
            if (labels != null) {
                builder.putAllLabels(labels);
            }
            if (props != null) {
                builder.putAllProperties(props);
            }
            if (spec != null) {
                builder.setSpec(spec);
            }
            Journal journal = StarMgrJournal.logUpdateWorkerGroup(serviceId, builder.build());
            this.journalSystem.write(journal);
            WorkerGroup group = swg.get(groupId);
            if (spec != null) {
                group.updateSpec(spec);
                this.submitAsyncTask(() -> this.resourceManager.alterResourceSpec(serviceId, groupId, spec));
            }
            if (props != null && !props.isEmpty()) {
                group.setProperties(props);
            }
            if (labels != null && !labels.isEmpty()) {
                group.setLabels(labels);
            }
        }
    }

    public void replayUpdateWorkerGroup(String serviceId, UpdateWorkerGroupInfo info) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                LogUtils.fatal(LOG, "service:{} not registered yet during replay!", serviceId);
            }
            long groupId = info.getGroupId();
            ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
            if (!swg.containsKey(groupId)) {
                LogUtils.fatal(LOG, "worker group:{} not exist in service:{}, during replay updateWorkerGroup!", groupId, serviceId);
            }
            WorkerGroup group = swg.get(groupId);
            if (info.hasSpec()) {
                group.updateSpec(info.getSpec());
            }
            if (!info.getLabelsMap().isEmpty()) {
                group.setLabels(info.getLabelsMap());
            }
            if (!info.getPropertiesMap().isEmpty()) {
                group.setProperties(info.getPropertiesMap());
            }
        }
    }

    public void deleteWorkerGroup(String serviceId, long groupId) throws StarException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                throw new NotExistStarException("service:{} not registered yet!", new Object[]{serviceId});
            }
            ServiceWorkerGroup swg = this.serviceWorkerGroups.get(serviceId);
            if (!swg.containsKey(groupId)) {
                throw new NotExistStarException("worker group:{} not exist in service:{}!", new Object[]{groupId, serviceId});
            }
            Journal journal = StarMgrJournal.logDeleteWorkerGroup(serviceId, groupId);
            this.journalSystem.write(journal);
            this.deleteWorkerGroupInternalNoLock(swg, groupId);
            this.submitAsyncTask(() -> this.resourceManager.releaseResource(serviceId, groupId));
        }
    }

    public void replayDeleteWorkerGroup(String serviceId, long groupId) {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            ServiceWorkerGroup swg;
            if (!this.serviceWorkerGroups.containsKey(serviceId)) {
                LogUtils.fatal(LOG, String.format("service:%s not registered yet! Should not happen!", serviceId), new Object[0]);
            }
            if (!(swg = this.serviceWorkerGroups.get(serviceId)).containsKey(groupId)) {
                LogUtils.fatal(LOG, String.format("worker group:%d not exist in service:%s, should not happen!", groupId, serviceId), new Object[0]);
            }
            this.deleteWorkerGroupInternalNoLock(swg, groupId);
        }
    }

    private void deleteWorkerGroupInternalNoLock(ServiceWorkerGroup serviceWorkerGroup, long groupId) {
        WorkerGroup group = serviceWorkerGroup.get(groupId);
        List<Long> allIds = group.getAllWorkerIds(false);
        allIds.forEach(this::deleteWorkerInternal);
        serviceWorkerGroup.remove(groupId);
    }

    private void deleteWorkerInternal(long workerId) {
        Worker todo = this.workers.remove(workerId);
        if (todo != null) {
            this.submitAsyncTask(todo::decommission);
        }
    }

    private void submitAsyncTask(Runnable runnable) {
        try {
            if (this.heartbeatExecutor != null) {
                this.heartbeatExecutor.execute(runnable);
            }
        }
        catch (Exception exception) {
            LOG.info("Fail to add async task for new added worker, ignore it for now. ", (Throwable)exception);
        }
    }

    public void dumpMeta(OutputStream out) throws IOException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            LOG.debug("start dump worker manager meta data ...");
            int NUM_PER_BATCH = 100;
            WorkerManagerImageMetaHeader header = WorkerManagerImageMetaHeader.newBuilder().setNumServiceWorkerGroup(this.serviceWorkerGroups.size()).setNumWorker(this.workers.size()).setBatchSize(100).setDigestAlgorithm("MD5").build();
            header.writeDelimitedTo(out);
            DigestOutputStream mdStream = Utils.getDigestOutputStream(out, "MD5");
            try (SectionWriter writer = new SectionWriter((OutputStream)mdStream);){
                try (OutputStream stream = writer.appendSection(SectionType.SECTION_WORKERMGR_SWG);){
                    this.dumpServiceWorkerGroups(stream);
                }
                stream = writer.appendSection(SectionType.SECTION_WORKERMGR_WORKER);
                var10_14 = null;
                try {
                    this.dumpWorkers(stream, 100);
                }
                catch (Throwable throwable) {
                    var10_14 = throwable;
                    throw throwable;
                }
                finally {
                    if (stream != null) {
                        if (var10_14 != null) {
                            try {
                                stream.close();
                            }
                            catch (Throwable throwable) {
                                var10_14.addSuppressed(throwable);
                            }
                        } else {
                            stream.close();
                        }
                    }
                }
            }
            mdStream.flush();
            WorkerManagerImageMetaFooter.Builder footerBuilder = WorkerManagerImageMetaFooter.newBuilder();
            if (mdStream.getMessageDigest() != null) {
                footerBuilder.setChecksum(ByteString.copyFrom((byte[])mdStream.getMessageDigest().digest()));
            }
            footerBuilder.build().writeDelimitedTo(out);
            LOG.debug("end dump worker manager meta data.");
        }
    }

    public void loadMeta(InputStream in) throws IOException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.writeLock());){
            LOG.debug("start load worker manager meta data ...");
            WorkerManagerImageMetaHeader header = WorkerManagerImageMetaHeader.parseDelimitedFrom((InputStream)in);
            if (header == null) {
                throw new EOFException();
            }
            int numSwg = header.getNumServiceWorkerGroup();
            int numWorkers = header.getNumWorker();
            DigestInputStream digestInput = Utils.getDigestInputStream(in, header.getDigestAlgorithm());
            try (SectionReader reader = new SectionReader((InputStream)digestInput);){
                reader.forEach(x -> this.loadSection((Section)x, header));
            }
            WorkerManagerImageMetaFooter footer = WorkerManagerImageMetaFooter.parseDelimitedFrom((InputStream)in);
            if (footer == null) {
                throw new EOFException();
            }
            Utils.validateChecksum(digestInput.getMessageDigest(), footer.getChecksum());
            LOG.debug("end load worker manager meta data, loaded serviceWorkerGroup:{}, workers:{}", (Object)numSwg, (Object)numWorkers);
        }
    }

    private void loadSection(Section section, WorkerManagerImageMetaHeader header) throws IOException {
        InputStream stream = section.getStream();
        switch (section.getHeader().getSectionType()) {
            case SECTION_WORKERMGR_SWG: {
                this.loadServiceWorkerGroups(stream, header.getNumServiceWorkerGroup());
                break;
            }
            case SECTION_WORKERMGR_WORKER: {
                this.loadWorkers(stream, header.getNumWorker(), header.getBatchSize());
                break;
            }
            default: {
                LOG.warn("Unknown section type:{} when loadMeta in WorkerManager, ignore it!", (Object)section.getHeader().getSectionType());
            }
        }
    }

    private void dumpServiceWorkerGroups(OutputStream stream) throws IOException {
        for (ServiceWorkerGroup swg : this.serviceWorkerGroups.values()) {
            WorkerManagerServiceWorkerGroup.Builder builder = WorkerManagerServiceWorkerGroup.newBuilder().setServiceId(swg.getServiceId()).setNumWorkerGroup(swg.workerGroups.size());
            WorkerGroup defaultWorkerGroup = swg.getDefaultWorkerGroup();
            if (defaultWorkerGroup != null) {
                builder.addWorkerGroupDetails(defaultWorkerGroup.toProtobuf().toByteString());
            }
            for (WorkerGroup group : swg.values()) {
                if (group == defaultWorkerGroup) continue;
                builder.addWorkerGroupDetails(group.toProtobuf().toByteString());
            }
            builder.build().writeDelimitedTo(stream);
        }
    }

    private void loadServiceWorkerGroups(InputStream stream, int numServiceWorkerGroup) throws IOException {
        for (int i = 0; i < numServiceWorkerGroup; ++i) {
            WorkerManagerServiceWorkerGroup swg = WorkerManagerServiceWorkerGroup.parseDelimitedFrom((InputStream)stream);
            if (swg == null) {
                throw new EOFException();
            }
            Preconditions.checkState((swg.getNumWorkerGroup() == swg.getWorkerGroupDetailsCount() ? 1 : 0) != 0);
            this.bootstrapService(swg.getServiceId());
            for (ByteString bs : swg.getWorkerGroupDetailsList()) {
                WorkerGroup group = WorkerGroup.fromProtobuf(WorkerGroupDetailInfo.parseFrom((ByteString)bs));
                this.serviceWorkerGroups.get(group.getServiceId()).put(group.getGroupId(), group);
            }
        }
    }

    private void dumpWorkers(OutputStream stream, int batchSize) throws IOException {
        WorkerManagerWorkers.Builder builder = WorkerManagerWorkers.newBuilder();
        int totalCount = 0;
        for (Worker worker : this.workers.values()) {
            builder.addWorker(worker.toProtobuf().toByteString());
            if (builder.getWorkerCount() < batchSize) continue;
            totalCount += builder.getWorkerCount();
            builder.build().writeDelimitedTo(stream);
            builder = WorkerManagerWorkers.newBuilder();
        }
        if (builder.getWorkerCount() > 0) {
            totalCount += builder.getWorkerCount();
            builder.build().writeDelimitedTo(stream);
        }
        Preconditions.checkState((totalCount == this.workers.size() ? 1 : 0) != 0);
    }

    private void loadWorkers(InputStream stream, int numWorkers, int batchSize) throws IOException {
        int numBatches = numWorkers / batchSize;
        if (numWorkers % batchSize > 0) {
            ++numBatches;
        }
        for (int i = 0; i < numBatches; ++i) {
            WorkerManagerWorkers batchWorkers = WorkerManagerWorkers.parseDelimitedFrom((InputStream)stream);
            if (batchWorkers == null) {
                throw new EOFException();
            }
            for (ByteString bs : batchWorkers.getWorkerList()) {
                Worker worker = Worker.fromProtobuf(WorkerInfo.parseFrom((ByteString)bs));
                WorkerGroup group = this.getWorkerGroupInternalNoLock(worker.getServiceId(), worker.getGroupId());
                if (group != null) {
                    this.workers.put(worker.getWorkerId(), worker);
                    group.addWorker(worker);
                    continue;
                }
                LOG.warn("Can't find worker group for worker: {}", (Object)worker);
            }
        }
    }

    public void start() {
        this.heartbeatExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    public void stop() {
        if (this.heartbeatExecutor != null) {
            Utils.shutdownExecutorService(this.heartbeatExecutor);
            this.heartbeatExecutor = null;
        }
        this.resourceManager.stop();
    }

    public void dump(DataOutputStream out) throws IOException {
        try (LockCloseable ignored = new LockCloseable((Lock)this.lock.readLock());){
            for (ServiceWorkerGroup swg : this.serviceWorkerGroups.values()) {
                for (WorkerGroup workerGroup : swg.values()) {
                    String s = JsonFormat.printer().print((MessageOrBuilder)workerGroup.toProtobuf()) + "\n";
                    out.writeBytes(s);
                }
            }
            for (Worker worker : this.workers.values()) {
                String s = JsonFormat.printer().print((MessageOrBuilder)worker.toProtobuf()) + "\n";
                out.writeBytes(s);
            }
        }
    }

    public static WorkerManager createWorkerManagerForTest(JournalSystem journalSystem) {
        return new WorkerManager(journalSystem == null ? new DummyJournalSystem() : journalSystem, new IdGenerator(null));
    }

    private static class ServiceWorkerGroup
    implements Map<Long, WorkerGroup> {
        private final String serviceId;
        private final Map<Long, WorkerGroup> workerGroups;
        private long defaultWorkerGroupId;

        public ServiceWorkerGroup(String serviceId) {
            this.serviceId = serviceId;
            this.workerGroups = new HashMap<Long, WorkerGroup>();
            this.defaultWorkerGroupId = -1L;
            if (Config.ENABLE_ZERO_WORKER_GROUP_COMPATIBILITY) {
                this.workerGroups.put(0L, new WorkerGroup(serviceId, 0L));
                this.defaultWorkerGroupId = 0L;
            }
        }

        public String getServiceId() {
            return this.serviceId;
        }

        public WorkerGroup getDefaultWorkerGroup() {
            return this.workerGroups.get(this.defaultWorkerGroupId);
        }

        @Override
        public int size() {
            return this.workerGroups.size();
        }

        @Override
        public boolean isEmpty() {
            return this.workerGroups.isEmpty();
        }

        @Override
        public boolean containsKey(Object key) {
            return this.workerGroups.containsKey(key);
        }

        @Override
        public boolean containsValue(Object value) {
            return this.workerGroups.containsValue(value);
        }

        @Override
        public WorkerGroup get(Object key) {
            return this.workerGroups.get(key);
        }

        @Override
        public WorkerGroup put(Long key, WorkerGroup value) {
            WorkerGroup result = this.workerGroups.put(key, value);
            this.updateDefaultWorkerGroup();
            return result;
        }

        @Override
        public WorkerGroup remove(Object key) {
            WorkerGroup result = this.workerGroups.remove(key);
            this.updateDefaultWorkerGroup();
            return result;
        }

        @Override
        public void putAll(Map<? extends Long, ? extends WorkerGroup> m) {
            this.workerGroups.putAll(m);
            this.updateDefaultWorkerGroup();
        }

        @Override
        public void clear() {
            this.workerGroups.clear();
            this.defaultWorkerGroupId = -1L;
        }

        @Override
        public Set<Long> keySet() {
            return this.workerGroups.keySet();
        }

        @Override
        public Collection<WorkerGroup> values() {
            return this.workerGroups.values();
        }

        @Override
        public Set<Map.Entry<Long, WorkerGroup>> entrySet() {
            return this.workerGroups.entrySet();
        }

        private void updateDefaultWorkerGroup() {
            if (this.workerGroups.containsKey(this.defaultWorkerGroupId)) {
                return;
            }
            this.defaultWorkerGroupId = this.workerGroups.keySet().stream().min(Comparator.naturalOrder()).orElse(-1L);
        }
    }
}

