/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.OnComplete;
import akka.routing.RoundRobinPool;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.enums.AppType;
import com.alibaba.schedulerx.common.monitor.MetricsCollector;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.IpUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.common.util.ReflectionUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.schedulerx.shade.com.google.protobuf.ProtocolMessageEnum;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.HttpResponse;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.JsonNode;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.Unirest;
import com.alibaba.schedulerx.shade.javassist.compiler.JvstCodeGen;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.MapUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.configuration.Configuration;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.ArrayUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.shade.org.apache.http.client.config.RequestConfig;
import com.alibaba.schedulerx.shade.org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import com.alibaba.schedulerx.shade.org.apache.http.conn.ssl.SSLContexts;
import com.alibaba.schedulerx.shade.org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import com.alibaba.schedulerx.shade.org.apache.http.impl.client.CloseableHttpClient;
import com.alibaba.schedulerx.shade.org.apache.http.impl.client.HttpClients;
import com.alibaba.schedulerx.shade.org.jboss.netty.channel.socket.nio.Boss;
import com.alibaba.schedulerx.shade.org.json.JSONException;
import com.alibaba.schedulerx.shade.scala.Function;
import com.alibaba.schedulerx.shade.scala.concurrent.Future;
import com.alibaba.schedulerx.worker.actor.ContainerActor;
import com.alibaba.schedulerx.worker.actor.JobInstanceActor;
import com.alibaba.schedulerx.worker.actor.LogActor;
import com.alibaba.schedulerx.worker.actor.TaskRouter;
import com.alibaba.schedulerx.worker.actor.WorkerHeartbeatActor;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.container.ShutdownMode;
import com.alibaba.schedulerx.worker.container.ThreadContainerPool;
import com.alibaba.schedulerx.worker.discovery.ArmoryResult;
import com.alibaba.schedulerx.worker.discovery.DefaultGroupDiscovery;
import com.alibaba.schedulerx.worker.discovery.GroupDiscovery;
import com.alibaba.schedulerx.worker.discovery.GroupManager;
import com.alibaba.schedulerx.worker.discovery.ServerDiscovery;
import com.alibaba.schedulerx.worker.discovery.ServerDiscoveryFactory;
import com.alibaba.schedulerx.worker.exception.DomainInvalidException;
import com.alibaba.schedulerx.worker.exception.DomainNotFoundException;
import com.alibaba.schedulerx.worker.exception.NamespaceNotFoundException;
import com.alibaba.schedulerx.worker.ha.AtLeastOnceDeliveryRoutingActor;
import com.alibaba.schedulerx.worker.ha.HealthTimeHolder;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCleaner;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import com.alibaba.schedulerx.worker.master.persistence.H2FilePersistence;
import com.alibaba.schedulerx.worker.metrics.CgroupMetrics;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.pull.PullManager;
import com.alibaba.schedulerx.worker.security.Authenticator;
import com.alibaba.schedulerx.worker.security.DefaultAuthenticator;
import com.alibaba.schedulerx.worker.service.WorkerHttpServer;
import com.alibaba.schedulerx.worker.timer.AbstractTimerTask;
import com.alibaba.schedulerx.worker.util.ConsoleUtil;
import com.alibaba.schedulerx.worker.util.DiamondUtil;
import com.alibaba.schedulerx.worker.util.FileUtils;
import com.alibaba.schedulerx.worker.util.SpringContext;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import org.joda.time.DateTime;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;

public class SchedulerxWorker
implements ApplicationContextAware,
InitializingBean,
ApplicationListener<ApplicationContextEvent> {
    private static final Logger LOGGER = LogFactory.getLogger(SchedulerxWorker.class);
    public static ActorSystem actorSystem = null;
    public static ActorRef AtLeastDeliveryRoutingActor = null;
    public static volatile boolean INITED = false;
    public static ClassLoader CUSTOMER_CLASS_LOADER = null;
    public static String WORKER_ADDR = null;
    private static GroupManager groupManager = GroupManager.INSTANCE;
    private static ScheduledExecutorService heartbeatSes = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("Schedulerx-heartbeat-thread").build(), new ThreadPoolExecutor.DiscardPolicy());
    private static int HEART_BEAT_TIMEOUT_TIMES = 0;
    private static List<String> EXCLUDE_KEYS = Lists.newArrayList("sls.ak", "sls.sk", "sls.aksk.encoded", "worker.timer.tasks", "appKey");
    private static List<String> INCLUDE_KEYS = Lists.newArrayList("address.server.domain", "domainName", "schedulerx.namespace", "groupId");
    private static final Logger heatbeatLogger = LogFactory.getLogger("heartbeat");
    private static volatile String WORKER_ID = null;
    private static volatile boolean forceActorSystemTerminate = false;

    private void initUnirest() {
        try {
            SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(null, new TrustSelfSignedStrategy()).build();
            SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcontext, SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
            RequestConfig clientConfig = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(5000).setConnectionRequestTimeout(15000).build();
            CloseableHttpClient httpclient = HttpClients.custom().setDefaultRequestConfig(clientConfig).setSSLSocketFactory(sslsf).disableCookieManagement().build();
            Unirest.setHttpClient(httpclient);
        }
        catch (Exception e) {
            LOGGER.error("Init Unirest Failed.", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() throws Exception {
        Class<SchedulerxWorker> clazz = SchedulerxWorker.class;
        synchronized (SchedulerxWorker.class) {
            Configuration conf;
            block18: {
                this.initUnirest();
                if (INITED) {
                    this.appendWorkerInit();
                    // ** MonitorExit[var1_1] (shouldn't be in output)
                    return;
                }
                LOGGER.info("Schedulerx Worker starting...");
                this.printMvnDenpendency();
                conf = ConfigUtil.getWorkerConfig();
                this.initMetaInfoFromSystem(conf);
                String domainName = this.initConsoleDomain();
                if (StringUtils.isBlank(domainName)) {
                    throw new DomainNotFoundException("Not found domainName.");
                }
                String host = conf.getString("hostname");
                if (host == null) {
                    host = SchedulerxWorker.getLocalHost();
                }
                if (!SchedulerxWorker.isolateMachine(host)) break block18;
                LOGGER.info("Schedulerx WorkerConfig" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                System.out.println("Schedulerx WorkerConfig=" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                Runtime.getRuntime().addShutdownHook(new Thread(){

                    @Override
                    public void run() {
                        SchedulerxWorker.shutdown(null);
                    }
                });
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            try {
                Boolean enableHttpServer;
                List<String> groupIdList = this.getGroupIds(conf);
                List<String> appKeyList = this.getAppKeys(conf);
                String namespace = this.initNamespace();
                String namespaceSource = this.initNamespaceSource();
                groupManager.init(namespace, namespaceSource, groupIdList, appKeyList);
                this.initCurLabel();
                this.checkParameters(conf, namespace);
                this.initMetaInfoFromConsole(namespace, namespaceSource, groupIdList, appKeyList);
                SchedulerxWorker.initStsKey();
                this.authenticate(conf, namespace, namespaceSource, groupIdList, appKeyList);
                WORKER_ID = SchedulerxWorker.startActorSystem();
                if (conf.getBoolean("batch.work.enable", false)) {
                    LOGGER.info("H2FilePersistence initing...");
                    SchedulerxWorker.initStore();
                    LOGGER.info("H2FilePersistence inited.");
                }
                SchedulerxWorker.initServerDiscovery(groupIdList);
                LOGGER.info("ServerDiscovery inited.");
                LOGGER.info("LogCollector initing...");
                Map<String, Long> groupIdMap = GroupManager.INSTANCE.getGroupId2AppGroupIdMap();
                for (long appGroupId : groupIdMap.values()) {
                    this.initLogCollector(appGroupId, WORKER_ADDR);
                }
                LOGGER.info("LogCollector inited...");
                if (conf.getBoolean("cgroup.metrics.enable", false)) {
                    CgroupMetrics.getInstance();
                    LOGGER.info("cgroup metrics inited.");
                }
                SchedulerxWorker.initTimerTask(conf);
                LOGGER.info("timer task inited.");
                if (conf.getBoolean("log.collector.enable", true)) {
                    LogCleaner logCleaner = LogCollectorFactory.newCleaner();
                    logCleaner.init();
                }
                if (SpringContext.context == null || ArrayUtils.isEmpty(SpringContext.context.getBeanNamesForType(SchedulerxWorker.class))) {
                    SchedulerxWorker.initHeartBeat(WORKER_ID);
                    LOGGER.info("heartbeat init.");
                }
                if ((enableHttpServer = Boolean.valueOf(ConfigUtil.getWorkerConfig().getBoolean("worker.http.server.enable", false))).booleanValue()) {
                    new WorkerHttpServer();
                    LOGGER.info("Schedulerx Worker http server init.");
                }
                LOGGER.info("Schedulerx Worker started.");
                INITED = true;
                LOGGER.info("Schedulerx WorkerConfig" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                System.out.println("Schedulerx WorkerConfig=" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                Runtime.getRuntime().addShutdownHook(new /* invalid duplicate definition of identical inner class */);
            }
            catch (Throwable t) {
                try {
                    LOGGER.error("Schedulerx Worker error", t);
                    SchedulerxWorker.terminateActorSystem();
                    if (ConfigUtil.getWorkerConfig().getBoolean("block.app.start", true)) {
                        throw new IOException("Schedulerx WorkerConfig" + ConfigUtil.toStringInclude(ConfigUtil.getWorkerConfig(), INCLUDE_KEYS), t);
                    }
                    LOGGER.info("Schedulerx WorkerConfig" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                    System.out.println("Schedulerx WorkerConfig=" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                    Runtime.getRuntime().addShutdownHook(new /* invalid duplicate definition of identical inner class */);
                }
                catch (Throwable throwable) {
                    LOGGER.info("Schedulerx WorkerConfig" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                    System.out.println("Schedulerx WorkerConfig=" + ConfigUtil.toStringExclude(ConfigUtil.getWorkerConfig(), EXCLUDE_KEYS));
                    Runtime.getRuntime().addShutdownHook(new /* invalid duplicate definition of identical inner class */);
                    throw throwable;
                }
            }
            return;
        }
    }

    private static void workerRoleOffline(ShutdownMode shutdownMode) throws InterruptedException {
        ContainerPool containerPool = ContainerFactory.getContainerPool();
        Map<Long, String> instanceMasterActorPathMap = containerPool.getInstanceMasterActorPathMap();
        if (MapUtils.isNotEmpty(instanceMasterActorPathMap)) {
            for (Map.Entry<Long, String> entry : instanceMasterActorPathMap.entrySet()) {
                try {
                    String masterHeartbeatAkkaPath = entry.getValue().replace("/user/task_routing", "/user/heartbeat_routing");
                    ActorSelection masterActorSelection = actorSystem.actorSelection(masterHeartbeatAkkaPath);
                    String workerIdAddr = WorkerIdGenerator.get() + "@" + WORKER_ADDR;
                    Worker.WorkerOfflineRequest workerOfflineRequest = Worker.WorkerOfflineRequest.newBuilder().setJobInstanceId(entry.getKey()).setShutdown(false).setWorkerIdAddr(workerIdAddr).build();
                    FutureUtils.awaitResult(masterActorSelection, (Object)workerOfflineRequest, 3L);
                }
                catch (Exception e) {
                    LOGGER.error("WorkerOfflineRequest send failed. masterHeartbeatAkkaPath={}, WORKER_ADDR={} ", entry.getValue(), WORKER_ADDR, e);
                }
            }
        }
        TimeUnit.SECONDS.sleep(3L);
        ThreadContainerPool.getInstance().shutdown(shutdownMode);
        PullManager.INSTANCE.stopAll();
        Map<Long, ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>> statusReqHandlerMap = ContainerStatusReqHandlerPool.INSTANCE.getHandlers();
        if (MapUtils.isNotEmpty(statusReqHandlerMap)) {
            for (Map.Entry<Long, ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest>> entry : statusReqHandlerMap.entrySet()) {
                ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest> statusReqHandler = entry.getValue();
                statusReqHandler.stop(false);
            }
        }
    }

    private static void masterRoleOffline(ShutdownMode shutdownMode) {
        for (TaskMaster taskMaster : TaskMasterPool.INSTANCE.getAllTaskMaster()) {
            try {
                taskMaster.terminate(shutdownMode);
            }
            catch (Throwable t) {
                LOGGER.warn("TaskMaster jobInstanceId={} shutdown failed.", taskMaster.getJobInstanceInfo());
            }
        }
    }

    private static long futureWait(boolean shutdownTimeout, java.util.concurrent.Future future2, long remainTime, long preTime) throws ExecutionException, InterruptedException, TimeoutException {
        if (shutdownTimeout) {
            future2.get(remainTime, TimeUnit.SECONDS);
            remainTime -= DateTime.now().getMillis() / 1000L - preTime;
        } else {
            future2.get();
        }
        return remainTime;
    }

    private static ShutdownMode getDefaultShutdownMode() {
        try {
            String graceShutdownMode = ConfigUtil.getWorkerConfig().getString("grace.shutdown.mode", null);
            if (graceShutdownMode != null) {
                return ShutdownMode.valueOf(StringUtils.upperCase(graceShutdownMode));
            }
            return ShutdownMode.parseValue(ConfigUtil.getWorkerConfig().getInteger("worker.shutdown.mode", 2));
        }
        catch (Throwable t) {
            LOGGER.warn("Get default shutdown config failed.", t);
            return ShutdownMode.IMMEDIATE;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static synchronized void shutdown(ShutdownMode shutdownMode) {
        block35: {
            ShutdownMode finalShutdownMode;
            ExecutorService offlineExecutor;
            block34: {
                LOGGER.info("worker shutdown...");
                if (!INITED) {
                    LOGGER.info("worker shutdown ignore, worker not init.");
                    return;
                }
                INITED = false;
                offlineExecutor = Executors.newSingleThreadExecutor();
                SchedulerxWorker.sendHeartBeat(WorkerIdGenerator.get(), false);
                ShutdownMode shutdownMode2 = finalShutdownMode = shutdownMode == null ? SchedulerxWorker.getDefaultShutdownMode() : shutdownMode;
                if (finalShutdownMode != null && !finalShutdownMode.equals((Object)ShutdownMode.IMMEDIATE)) break block34;
                offlineExecutor.shutdownNow();
                ContainerPool containerPool = ContainerFactory.getContainerPool();
                Map<Long, String> instanceMasterActorPathMap = containerPool.getInstanceMasterActorPathMap();
                if (MapUtils.isNotEmpty(instanceMasterActorPathMap)) {
                    for (Map.Entry<Long, String> entry : instanceMasterActorPathMap.entrySet()) {
                        try {
                            String masterHeartbeatAkkaPath = entry.getValue().replace("/user/task_routing", "/user/heartbeat_routing");
                            ActorSelection masterActorSelection = actorSystem.actorSelection(masterHeartbeatAkkaPath);
                            String workerIdAddr = WorkerIdGenerator.get() + "@" + WORKER_ADDR;
                            Worker.WorkerOfflineRequest workerOfflineRequest = Worker.WorkerOfflineRequest.newBuilder().setJobInstanceId(entry.getKey()).setShutdown(true).setWorkerIdAddr(workerIdAddr).build();
                            FutureUtils.awaitResult(masterActorSelection, (Object)workerOfflineRequest, 3L);
                        }
                        catch (Exception e) {
                            LOGGER.error("WorkerOfflineRequest send failed. masterHeartbeatAkkaPath={}, WORKER_ADDR={} ", entry.getValue(), WORKER_ADDR, e);
                        }
                    }
                }
                for (TaskMaster taskMaster : TaskMasterPool.INSTANCE.getAllTaskMaster()) {
                    try {
                        taskMaster.killInstance(true, "Worker master shutdown.");
                    }
                    catch (Throwable t) {
                        LOGGER.warn("TaskMaster jobInstanceId={} shutdown failed.", taskMaster.getJobInstanceInfo());
                    }
                }
                SchedulerxWorker.terminateActorSystem();
                LOGGER.info("worker shutdown finished.");
                return;
            }
            try {
                long startTime = DateTime.now().getMillis() / 1000L;
                long l = ConfigUtil.getWorkerConfig().getLong("grace.shutdown.timeout", 0L);
                boolean shutdownTimeout = l > 0L;
                java.util.concurrent.Future<?> future2 = offlineExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            SchedulerxWorker.workerRoleOffline(finalShutdownMode);
                        }
                        catch (Throwable t) {
                            LOGGER.error("do worker role offline failed.", t);
                        }
                    }
                });
                SchedulerxWorker.futureWait(shutdownTimeout, future2, l, startTime);
                startTime = DateTime.now().getMillis() / 1000L;
                future2 = offlineExecutor.submit(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            SchedulerxWorker.masterRoleOffline(finalShutdownMode);
                        }
                        catch (Throwable t) {
                            LOGGER.error("do worker role offline failed.", t);
                        }
                    }
                });
                SchedulerxWorker.futureWait(shutdownTimeout, future2, l, startTime);
                TimeUnit.SECONDS.sleep(1L);
                offlineExecutor.shutdownNow();
            }
            catch (Exception e) {
                try {
                    LOGGER.warn("Worker shutdown failed.", e);
                    offlineExecutor.shutdownNow();
                }
                catch (Throwable throwable) {
                    offlineExecutor.shutdownNow();
                    ContainerPool containerPool = ContainerFactory.getContainerPool();
                    Map<Long, String> instanceMasterActorPathMap = containerPool.getInstanceMasterActorPathMap();
                    if (MapUtils.isNotEmpty(instanceMasterActorPathMap)) {
                        for (Map.Entry entry : instanceMasterActorPathMap.entrySet()) {
                            try {
                                String masterHeartbeatAkkaPath = ((String)entry.getValue()).replace("/user/task_routing", "/user/heartbeat_routing");
                                ActorSelection masterActorSelection = actorSystem.actorSelection(masterHeartbeatAkkaPath);
                                String workerIdAddr = WorkerIdGenerator.get() + "@" + WORKER_ADDR;
                                Worker.WorkerOfflineRequest workerOfflineRequest = Worker.WorkerOfflineRequest.newBuilder().setJobInstanceId((Long)entry.getKey()).setShutdown(true).setWorkerIdAddr(workerIdAddr).build();
                                FutureUtils.awaitResult(masterActorSelection, (Object)workerOfflineRequest, 3L);
                            }
                            catch (Exception e2) {
                                LOGGER.error("WorkerOfflineRequest send failed. masterHeartbeatAkkaPath={}, WORKER_ADDR={} ", entry.getValue(), WORKER_ADDR, e2);
                            }
                        }
                    }
                    for (TaskMaster taskMaster : TaskMasterPool.INSTANCE.getAllTaskMaster()) {
                        try {
                            taskMaster.killInstance(true, "Worker master shutdown.");
                        }
                        catch (Throwable t) {
                            LOGGER.warn("TaskMaster jobInstanceId={} shutdown failed.", taskMaster.getJobInstanceInfo());
                        }
                    }
                    SchedulerxWorker.terminateActorSystem();
                    LOGGER.info("worker shutdown finished.");
                    throw throwable;
                }
                ContainerPool containerPool = ContainerFactory.getContainerPool();
                Map<Long, String> instanceMasterActorPathMap = containerPool.getInstanceMasterActorPathMap();
                if (MapUtils.isNotEmpty(instanceMasterActorPathMap)) {
                    for (Map.Entry entry : instanceMasterActorPathMap.entrySet()) {
                        try {
                            String string2 = ((String)entry.getValue()).replace("/user/task_routing", "/user/heartbeat_routing");
                            ActorSelection masterActorSelection = actorSystem.actorSelection(string2);
                            String workerIdAddr = WorkerIdGenerator.get() + "@" + WORKER_ADDR;
                            Worker.WorkerOfflineRequest workerOfflineRequest = Worker.WorkerOfflineRequest.newBuilder().setJobInstanceId((Long)entry.getKey()).setShutdown(true).setWorkerIdAddr(workerIdAddr).build();
                            FutureUtils.awaitResult(masterActorSelection, (Object)workerOfflineRequest, 3L);
                        }
                        catch (Exception exception) {
                            LOGGER.error("WorkerOfflineRequest send failed. masterHeartbeatAkkaPath={}, WORKER_ADDR={} ", entry.getValue(), WORKER_ADDR, exception);
                        }
                    }
                }
                for (TaskMaster taskMaster : TaskMasterPool.INSTANCE.getAllTaskMaster()) {
                    try {
                        taskMaster.killInstance(true, "Worker master shutdown.");
                    }
                    catch (Throwable throwable) {
                        LOGGER.warn("TaskMaster jobInstanceId={} shutdown failed.", taskMaster.getJobInstanceInfo());
                    }
                }
                SchedulerxWorker.terminateActorSystem();
                LOGGER.info("worker shutdown finished.");
                break block35;
            }
            ContainerPool containerPool = ContainerFactory.getContainerPool();
            Map<Long, String> instanceMasterActorPathMap = containerPool.getInstanceMasterActorPathMap();
            if (MapUtils.isNotEmpty(instanceMasterActorPathMap)) {
                for (Map.Entry entry : instanceMasterActorPathMap.entrySet()) {
                    try {
                        String string3 = ((String)entry.getValue()).replace("/user/task_routing", "/user/heartbeat_routing");
                        ActorSelection masterActorSelection = actorSystem.actorSelection(string3);
                        String workerIdAddr = WorkerIdGenerator.get() + "@" + WORKER_ADDR;
                        Worker.WorkerOfflineRequest workerOfflineRequest = Worker.WorkerOfflineRequest.newBuilder().setJobInstanceId((Long)entry.getKey()).setShutdown(true).setWorkerIdAddr(workerIdAddr).build();
                        FutureUtils.awaitResult(masterActorSelection, (Object)workerOfflineRequest, 3L);
                    }
                    catch (Exception exception) {
                        LOGGER.error("WorkerOfflineRequest send failed. masterHeartbeatAkkaPath={}, WORKER_ADDR={} ", entry.getValue(), WORKER_ADDR, exception);
                    }
                }
            }
            for (TaskMaster taskMaster : TaskMasterPool.INSTANCE.getAllTaskMaster()) {
                try {
                    taskMaster.killInstance(true, "Worker master shutdown.");
                }
                catch (Throwable throwable) {
                    LOGGER.warn("TaskMaster jobInstanceId={} shutdown failed.", taskMaster.getJobInstanceInfo());
                }
            }
            SchedulerxWorker.terminateActorSystem();
            LOGGER.info("worker shutdown finished.");
        }
    }

    public void appendWorkerInit() throws Exception {
        Configuration conf = ConfigUtil.getWorkerConfig();
        String groupDiscoveryClassName = conf.getString("worker.group.discovery", DefaultGroupDiscovery.class.getName());
        GroupDiscovery groupDiscovery = (GroupDiscovery)ReflectionUtil.getInstanceByClassName(groupDiscoveryClassName, CUSTOMER_CLASS_LOADER);
        if (groupDiscovery != null && groupDiscovery.isSystemProperty() && (SpringContext.context != null || SpringContext.context.getBeanNamesForType(SchedulerxWorker.class).length > 1)) {
            LOGGER.error("If use system property config (eg. -Dschedulerx.group=xxx), can't support exists too many <schedulerxworker> beans, please check your configuration.");
            throw new IllegalStateException("If use system property config (eg. -Dschedulerx.group=xxx), can't support exists too many <schedulerxworker> beans, please check your configuration.");
        }
        ArrayList<String> newGroupIds = Lists.newArrayList();
        ArrayList<String> newAppKeys = Lists.newArrayList();
        List<String> groupIds = this.getGroupIds(conf);
        List<String> appKeys = this.getAppKeys(conf);
        for (int i = 0; i < groupIds.size(); ++i) {
            groupManager.putGroupId2AppKeyMap(groupIds.get(i), appKeys.get(i));
            if (groupManager.contains(groupIds.get(i))) continue;
            newGroupIds.add(groupIds.get(i));
            newAppKeys.add(appKeys.get(i));
        }
        if (newGroupIds.isEmpty()) {
            LOGGER.warn("groupIds={}, newGroupIds is empty, skip appendWorkerInit");
            return;
        }
        String namespace = this.initNamespace();
        String namespaceSource = this.initNamespaceSource();
        this.initMetaInfoFromConsole(namespace, namespaceSource, newGroupIds, newAppKeys);
        this.authenticate(conf, namespace, namespaceSource, newGroupIds, newAppKeys);
        SchedulerxWorker.initServerDiscovery(newGroupIds);
        LOGGER.info("Append schedulerx Worker started.");
    }

    private List<String> getGroupIds(Configuration conf) throws Exception {
        String groupDiscoveryClassName = conf.getString("worker.group.discovery", DefaultGroupDiscovery.class.getName());
        GroupDiscovery groupDiscovery = (GroupDiscovery)ReflectionUtil.getInstanceByClassName(groupDiscoveryClassName, CUSTOMER_CLASS_LOADER);
        List<String> groupIdList = groupDiscovery.getGroupIdList(conf);
        if (groupIdList == null || groupIdList.isEmpty()) {
            throw new IOException("please set groupId");
        }
        return groupIdList;
    }

    private List<String> getAppKeys(Configuration conf) throws Exception {
        String groupDiscoveryClassName = conf.getString("worker.group.discovery", DefaultGroupDiscovery.class.getName());
        GroupDiscovery groupDiscovery = (GroupDiscovery)ReflectionUtil.getInstanceByClassName(groupDiscoveryClassName, CUSTOMER_CLASS_LOADER);
        List<String> appKeyList = groupDiscovery.getAppKeyList(conf);
        if (appKeyList == null || appKeyList.isEmpty()) {
            throw new IOException("please set appKey");
        }
        return appKeyList;
    }

    private String initConsoleDomain() throws Exception {
        String domain = System.getProperty("schedulerx.console.domain");
        if (StringUtils.isBlank(domain)) {
            domain = ConfigUtil.getWorkerConfig().getString("domainName");
        }
        if (StringUtils.isBlank(domain)) {
            domain = ConsoleUtil.getDomainFromHttpServer();
        }
        if (StringUtils.isBlank(domain)) {
            domain = DiamondUtil.getData("com.alibaba.schedulerx.domain");
        }
        if (domain != null && domain.contains("http")) {
            throw new DomainInvalidException("domainName need not http:// only domain eg: schedulerx2.tao.net");
        }
        ConfigUtil.getWorkerConfig().setProperty("domainName", domain);
        return domain;
    }

    private static void initStsKey() {
        String stsAK = System.getProperty("sts.accessKey");
        String stsSK = System.getProperty("sts.secretKey");
        String stsToken = System.getProperty("sts.token");
        if (StringUtils.isBlank(stsAK) && StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("sts.accessKey"))) {
            stsAK = System.getenv("sts.accessKey".replace(".", "_"));
            stsSK = System.getenv("sts.secretKey".replace(".", "_"));
            stsToken = System.getenv("sts.token".replace(".", "_"));
        }
        ConfigUtil.getWorkerConfig().setProperty("sts.accessKey", stsAK);
        ConfigUtil.getWorkerConfig().setProperty("sts.secretKey", stsSK);
        ConfigUtil.getWorkerConfig().setProperty("sts.token", stsToken);
    }

    private static String getLocalHost() {
        String domain = ConfigUtil.getWorkerConfig().getString("domainName");
        String localHost = System.getProperty("hsf.server.ip");
        if (StringUtils.isNotBlank(localHost)) {
            return localHost;
        }
        try (Socket socket = new Socket();){
            InetAddress address;
            if (domain != null) {
                if (domain.contains(":")) {
                    String[] tokens = domain.split(":");
                    String hostname = tokens[0];
                    int port = 0;
                    port = tokens[1].contains("/") ? Integer.parseInt(tokens[1].split("/")[0]) : Integer.parseInt(tokens[1]);
                    socket.connect(new InetSocketAddress(hostname, port), 5000);
                } else {
                    socket.connect(new InetSocketAddress(domain, 80), 5000);
                }
            }
            localHost = (address = socket.getLocalAddress()) instanceof Inet6Address ? IpUtil.getIPV4Address() : address.getHostAddress();
        }
        catch (Exception e) {
            LOGGER.error("get local host error", e);
            localHost = IpUtil.getIPV4Address();
        }
        return localHost;
    }

    private void initMetaInfoFromSystem(Configuration conf) throws Exception {
        Properties properties = System.getProperties();
        LOGGER.debug("system.properties=" + properties);
        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
            Object key = entry.getKey();
            Object value = entry.getValue();
            if (!key.toString().startsWith("schedulerx")) continue;
            conf.setProperty(key.toString(), value);
        }
    }

    private void initMetaInfoFromConsole(String namespace, String namespaceSource, List<String> groupIds, List<String> appKeys) throws Exception {
        Map<String, Object> properties = ConsoleUtil.fetchMetaInfoFromConsole(namespace, namespaceSource, groupIds, appKeys);
        if (properties != null) {
            int appType;
            Configuration conf = ConfigUtil.getWorkerConfig();
            for (Map.Entry<String, Object> entry : properties.entrySet()) {
                conf.setProperty(entry.getKey(), entry.getValue());
            }
            if (properties.containsKey("schedulerx.app.type") && ((appType = Integer.valueOf(properties.get("schedulerx.app.type").toString()).intValue()) == AppType.KUBERNETES.getValue() || appType == AppType.ALIYUN_ACK.getValue())) {
                conf.setProperty("cgroup.metrics.enable", true);
                LOGGER.info("appType={}, auto set cgroup.metrics.enable=true", appType);
                Configuration k8sConf = ConfigUtil.newConfig("schedulerx-k8s.properties");
                Iterator<String> keys = k8sConf.getKeys();
                while (keys.hasNext()) {
                    String key = keys.next();
                    conf.setProperty(key, k8sConf.getProperty(key));
                }
                LOGGER.info("appType={}, import schedulerx-k8s.properties finished", appType);
                String k8sNamespace = FileUtils.readLine("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
                if (k8sNamespace != null) {
                    conf.setProperty("k8s.namespace", k8sNamespace);
                }
            }
        }
    }

    private void initLogCollector(long appGroupId, String workerAddr) {
        LogCollector logCollector = LogCollectorFactory.refresh();
        logCollector.collect(appGroupId, "schedulerx", "hello schedulerx, workerAddr:" + workerAddr);
    }

    private String initNamespace() {
        String namespace = ConfigUtil.getWorkerConfig().getString("schedulerx.namespace");
        if (StringUtils.isBlank(namespace)) {
            namespace = System.getProperty("schedulerx.namespace");
        }
        if (StringUtils.isBlank(namespace)) {
            namespace = System.getProperty("tenant.id");
        }
        return namespace;
    }

    private String initNamespaceSource() {
        String namespaceSource = ConfigUtil.getWorkerConfig().getString("schedulerx.namespace.source");
        if (StringUtils.isBlank(namespaceSource)) {
            namespaceSource = System.getProperty("schedulerx.namespace.source");
        }
        if (StringUtils.isBlank(namespaceSource)) {
            namespaceSource = System.getenv("schedulerx.namespace.source".replace(".", "_"));
        }
        return namespaceSource;
    }

    private void initCurLabel() {
        String label = ConfigUtil.getWorkerConfig().getString("worker.label");
        if (StringUtils.isBlank(label) && StringUtils.isBlank(label = System.getProperty("ALIBABA_DEPLOY_VERSION"))) {
            label = System.getProperty("worker.label");
        }
        if (StringUtils.isBlank(label)) {
            label = System.getenv("EDAS_PACKAGE_VERSION");
        }
        if (StringUtils.isNotBlank(label)) {
            ConfigUtil.getWorkerConfig().setProperty("worker.cur.label", label.trim().replace(" ", "_"));
        }
    }

    private void checkParameters(Configuration conf, String namespace) throws Exception {
        if (conf.getBoolean("schedulerx.namespace.enable", false) && StringUtils.isBlank(namespace)) {
            throw new NamespaceNotFoundException("Not found namespace.");
        }
    }

    private void authenticate(Configuration conf, String namespace, String namespaceSource, List<String> groupIds, List<String> appKeys) throws Exception {
        String authenticatorClassName = conf.getString("schedulerx.authenticate", DefaultAuthenticator.class.getName());
        if (authenticatorClassName.equalsIgnoreCase("null")) {
            LOGGER.warn("server don't support authentication");
            return;
        }
        Authenticator authenticator = (Authenticator)ReflectionUtil.getInstanceByClassName(authenticatorClassName, CUSTOMER_CLASS_LOADER);
        if (authenticator == null) {
            throw new IOException("authenticator is null");
        }
        authenticator.authenticate(conf, namespace, namespaceSource, groupIds, appKeys);
        LOGGER.info("authenticate success.");
    }

    private static void initStore() throws Exception {
        H2FilePersistence persistence = H2FilePersistence.getInstance();
        persistence.initTable();
    }

    private static boolean isolateMachine(String host) throws Exception {
        Configuration conf = ConfigUtil.getWorkerConfig();
        ArrayList<String> enableUnits = Lists.newArrayList(conf.getStringArray("enable.units"));
        ArrayList<String> enableSites = Lists.newArrayList(conf.getStringArray("enable.sites"));
        ArrayList<String> disableUnits = Lists.newArrayList(conf.getStringArray("disable.units"));
        ArrayList<String> disableSites = Lists.newArrayList(conf.getStringArray("disable.sites"));
        String unit = System.getenv("SIGMA_APP_UNIT");
        String site = System.getenv("SIGMA_APP_SITE");
        if (!enableUnits.isEmpty() || !enableSites.isEmpty()) {
            if (StringUtils.isBlank(unit) || StringUtils.isBlank(site)) {
                String url = "http://api.sh.gns.alibaba-inc.com/gns/armory/query?ip=" + host;
                LOGGER.info("get machine info, url=" + url);
                HttpResponse<JsonNode> response = Unirest.get(url).asJson();
                ArmoryResult result2 = JsonUtil.fromJson(response.getBody().getObject().toString(), ArmoryResult.class);
                if (result2.isSuccess() && result2.getData() != null) {
                    unit = result2.getData().getUnit();
                    site = result2.getData().getSite();
                } else {
                    LOGGER.warn("get armory result failed, result=" + result2);
                    throw new IOException("get armory result failed");
                }
            }
            String simpleUnit = unit.substring(unit.indexOf(".") + 1);
            String simpleSite = simpleUnit + "." + site;
            if (!(enableUnits.contains(simpleUnit) || enableSites.contains(simpleSite) || enableUnits.contains(unit) || enableSites.contains(site))) {
                LOGGER.warn("init isolated. ip=" + host + ", unit=" + unit + ", site=" + site);
                return true;
            }
        } else if (!disableUnits.isEmpty() || !disableSites.isEmpty()) {
            if (StringUtils.isBlank(unit) || StringUtils.isBlank(site)) {
                String url = "http://api.sh.gns.alibaba-inc.com/gns/armory/query?ip=" + host;
                HttpResponse<JsonNode> response = Unirest.get(url).asJson();
                ArmoryResult result3 = JsonUtil.fromJson(response.getBody().getObject().toString(), ArmoryResult.class);
                if (result3.isSuccess() && result3.getData() != null) {
                    unit = result3.getData().getUnit();
                    site = result3.getData().getSite();
                } else {
                    LOGGER.warn("get armory result failed, result=" + result3);
                    throw new IOException("get armory result failed");
                }
            }
            String simpleUnit = unit.substring(unit.indexOf(".") + 1);
            String simpleSite = simpleUnit + "." + site;
            if (disableUnits.contains(simpleUnit) || disableSites.contains(simpleSite) || disableUnits.contains(unit) || disableSites.contains(site)) {
                LOGGER.warn("init isolated. ip=" + host + ", unit=" + unit + ", site=" + site);
                return true;
            }
        }
        return false;
    }

    private static void initTimerTask(Configuration conf) throws Exception {
        List<AbstractTimerTask> timerTasks = ReflectionUtil.getInstancesByConf(conf, "worker.timer.tasks");
        if (timerTasks != null && !timerTasks.isEmpty()) {
            for (final AbstractTimerTask timerTask : timerTasks) {
                ScheduledExecutorService ses = Executors.newScheduledThreadPool(1, new ThreadFactory(){

                    @Override
                    public Thread newThread(Runnable runnable) {
                        return new Thread(runnable, "Worker-timer-Thread-" + timerTask.getName());
                    }
                });
                ses.scheduleAtFixedRate(timerTask, timerTask.getInitialDelay(), timerTask.getPeriod(), TimeUnit.SECONDS);
                timerTask.init();
            }
        }
    }

    private static void initServerDiscovery(List<String> groupIdList) throws Exception {
        for (String groupId : groupIdList) {
            if (groupManager.contains(groupId)) continue;
            groupManager.startServerDiscovery(groupId);
            groupManager.appendGroupId(groupId, groupId);
        }
    }

    private static void initActors(ActorSystem actorSystem, String workerId) throws Exception {
        int heartbeatActorSize = ConfigUtil.getWorkerConfig().getInt("worker.heartbeat.actor.num", 2);
        actorSystem.actorOf(Props.create(WorkerHeartbeatActor.class, new Object[0]).withRouter(new RoundRobinPool(heartbeatActorSize)).withDispatcher("akka.actor.thread-dispatcher-heartbeat"), "heartbeat_routing");
        int instanceActorSize = ConfigUtil.getWorkerConfig().getInt("worker.jobinstance.actor.num", 128);
        actorSystem.actorOf(Props.create(JobInstanceActor.class, new Object[0]).withRouter(new RoundRobinPool(instanceActorSize)).withDispatcher("akka.actor.thread-dispatcher-instance"), "job_instance_routing");
        int logActorSize = ConfigUtil.getWorkerConfig().getInt("worker.log.actor.num", 100);
        actorSystem.actorOf(Props.create(LogActor.class, new Object[0]).withRouter(new RoundRobinPool(logActorSize)).withDispatcher("akka.actor.thread-dispatcher-log"), "log_routing");
        int containerActorSize = ConfigUtil.getWorkerConfig().getInt("worker.container.actor.num", 256);
        actorSystem.actorOf(Props.create(ContainerActor.class, new Object[0]).withRouter(new RoundRobinPool(containerActorSize)).withDispatcher("akka.actor.thread-dispatcher-container"), "container_routing");
        int taskActorSize = ConfigUtil.getWorkerConfig().getInt("worker.task.actor.num", 128);
        actorSystem.actorOf(TaskRouter.props(taskActorSize).withDispatcher("akka.actor.thread-dispatcher-task"), "task_routing");
        AtLeastDeliveryRoutingActor = actorSystem.actorOf(AtLeastOnceDeliveryRoutingActor.props(ConfigUtil.getWorkerConfig().getInt("at.least.once.delivery.actor.num", 100)).withDispatcher("akka.actor.thread-dispatcher-delivery"), "at_least_once_delivery_routing");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void sendHeartBeat(String workerId, Boolean online) {
        TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
        Configuration conf = ConfigUtil.getWorkerConfig();
        String version = conf.getString("worker.version");
        String starter = conf.getString("stater.mode", "java");
        String label = conf.getString("worker.cur.label", "");
        String source = conf.getString("schedulerx.worker.source", "unknown");
        boolean enableHeartBeatLog = conf.getBoolean("schedulerx.heartbeat.log.enable", true);
        Map<String, Long> groupIdMap = GroupManager.INSTANCE.getGroupId2AppGroupIdMap();
        try {
            for (Map.Entry<String, Long> groupEntry : groupIdMap.entrySet()) {
                String groupId = groupEntry.getKey();
                long appGroupId = groupEntry.getValue();
                String appKey = GroupManager.INSTANCE.getAppKeyByGroupId(groupId);
                ServerDiscovery serverDiscovery = ServerDiscoveryFactory.getDiscovery(groupId);
                if (serverDiscovery == null || serverDiscovery.getActiveHeartBeatActor() == null) {
                    heatbeatLogger.warn("heartbeatActor is null, can be ignored if not frequently occurs");
                    return;
                }
                ActorSelection heartbeatActor = serverDiscovery.getActiveHeartBeatActor();
                Metrics metrics = MetricsCollector.getMetrics();
                if (metrics != null) {
                    metrics.setExecCount(ThreadContainerPool.getInstance().getCount(groupId));
                }
                Worker.WorkerHeartBeatRequest.Builder builder = Worker.WorkerHeartBeatRequest.newBuilder().setVersion(version).setGroupId(groupId).setWorkerId(workerId).addAllJobInstanceId(masterPool.getInstanceIds(appGroupId)).setMetricsJson(metrics != null ? JsonUtil.toJson(metrics) : "").setStarter(starter).setAppGroupId(appGroupId).setSource(source).setLabel(label).setOnline(online);
                if (StringUtils.isNotBlank(appKey)) {
                    builder.setAppKey(appKey);
                }
                Worker.WorkerHeartBeatRequest request = builder.build();
                try {
                    long start2 = System.currentTimeMillis();
                    Worker.WorkerHeartBeatResponse response = (Worker.WorkerHeartBeatResponse)FutureUtils.awaitResult(heartbeatActor, (Object)request, 5L);
                    long end = System.currentTimeMillis();
                    if (!response.getSuccess()) {
                        heatbeatLogger.error("heartbeat groupId={} appKey={} to {} error, cost={}ms, errMsg={}", groupId, appKey, heartbeatActor.anchorPath().address(), end - start2, response.getMessage());
                    } else if (enableHeartBeatLog) {
                        heatbeatLogger.info("heartbeat<online:{}> groupId={} to {}, cost={}ms", online, groupId, heartbeatActor.anchorPath().address(), end - start2);
                    }
                    HealthTimeHolder.INSTANCE.resetServerHeartbeatTime();
                    HEART_BEAT_TIMEOUT_TIMES = 0;
                }
                catch (TimeoutException e) {
                    String serverAddr;
                    heatbeatLogger.warn("heart beat groupId={} to {} timeout", groupId, heartbeatActor.anchorPath().address());
                    if (!online.booleanValue()) {
                        return;
                    }
                    if (!conf.getBoolean("akka.remoting.auto.recover", true) || !StringUtils.isNotEmpty(serverAddr = serverDiscovery.getActiveServerAddr())) continue;
                    String[] tokens = serverAddr.split(":");
                    if (tokens.length == 2) {
                        String host = tokens[0];
                        int port = Integer.valueOf(tokens[1]);
                        Socket socket = new Socket();
                        try {
                            socket.connect(new InetSocketAddress(host, port), 5000);
                            heatbeatLogger.info("socket to {}:{} is reachable, times={}", host, port, HEART_BEAT_TIMEOUT_TIMES);
                            if (HEART_BEAT_TIMEOUT_TIMES >= 10) {
                                SchedulerxWorker.restartActorSystem();
                                HEART_BEAT_TIMEOUT_TIMES = 0;
                                continue;
                            }
                            ++HEART_BEAT_TIMEOUT_TIMES;
                            continue;
                        }
                        catch (Exception e2) {
                            heatbeatLogger.warn("socket to {}:{} is not reachable", host, port, e2);
                            HEART_BEAT_TIMEOUT_TIMES = 0;
                            continue;
                        }
                        finally {
                            if (socket != null) {
                                socket.close();
                            }
                            continue;
                        }
                    }
                    heatbeatLogger.error("wrong serverAddr=" + serverAddr);
                }
                catch (Exception ex) {
                    heatbeatLogger.warn("active server={} lost.", serverDiscovery.getActiveServerAddr(), ex);
                }
            }
        }
        catch (Throwable t) {
            heatbeatLogger.warn("heartbeat error", t);
        }
    }

    private static void initHeartBeat(final String workerId) {
        Configuration conf = ConfigUtil.getWorkerConfig();
        int heartbeatInterval = conf.getInt("schedulerx.worker.heartbeat.interval", 5);
        if (((ScheduledThreadPoolExecutor)heartbeatSes).getTaskCount() == 0L) {
            heartbeatSes.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    SchedulerxWorker.sendHeartBeat(workerId, INITED);
                }
            }, 0L, heartbeatInterval, TimeUnit.SECONDS);
        }
    }

    public Configuration getConfig() {
        return ConfigUtil.getWorkerConfig();
    }

    public static void main(String[] args) throws Exception {
        try {
            SchedulerxWorker worker = new SchedulerxWorker();
            if (args != null && args.length == 1) {
                String agentConfPath = args[0];
                SchedulerxWorker.initAgentConf(agentConfPath);
            }
            worker.init();
            String userName = System.getProperties().getProperty("user.name");
            if (StringUtils.isBlank(userName)) {
                userName = "admin";
            }
            File startedFile = new File("/tmp/" + userName + "/schedulerx/AgentStarted");
            startedFile.getParentFile().mkdirs();
            startedFile.createNewFile();
        }
        catch (Exception e) {
            LOGGER.error("Schedulerx worker start error", e);
            System.exit(1);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        LOGGER.info("set applicationContext={} to SpringContext={}", SpringContext.context, applicationContext);
        SpringContext.context = applicationContext;
        if (ConfigUtil.getWorkerConfig().getProperty("stater.mode") == null) {
            ConfigUtil.getWorkerConfig().setProperty("stater.mode", "spring");
        }
    }

    public void afterPropertiesSet() throws Exception {
        LOGGER.info("initializing bean...");
        this.init();
    }

    private void printMvnDenpendency() {
        LOGGER.info("===maven dependencies===");
        LOGGER.info("netty:" + Boss.class.getResource(""));
        LOGGER.info("protobuf-java:" + ProtocolMessageEnum.class.getResource(""));
        LOGGER.info("javaassist:" + JvstCodeGen.class.getResource(""));
        LOGGER.info("commons-configuration:" + Configuration.class.getResource(""));
        LOGGER.info("config:" + Config.class.getResource(""));
        LOGGER.info("gson:" + JSONException.class.getResource(""));
        LOGGER.info("com.alibaba.schedulerx.shade.scala:" + Function.class.getResource(""));
        LOGGER.info("===================");
    }

    public void setDomainName(String domainName) {
        if (StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("domainName"))) {
            ConfigUtil.getWorkerConfig().setProperty("domainName", domainName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setGroupId(String groupId) {
        Class<SchedulerxWorker> clazz = SchedulerxWorker.class;
        synchronized (SchedulerxWorker.class) {
            Configuration workerConf = ConfigUtil.getWorkerConfig();
            if (INITED) {
                String[] initedGroups = workerConf.getStringArray("groupId");
                ArrayList<String> groupIdList = Lists.newArrayList(initedGroups);
                String[] newGroups = groupId.split(",");
                ArrayList<String> newGroupIdList = Lists.newArrayList(newGroups);
                groupIdList.addAll(newGroupIdList);
                workerConf.setProperty("groupId", StringUtils.join(groupIdList, ","));
            } else {
                workerConf.setProperty("groupId", groupId);
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setAppKey(String appKey) {
        Class<SchedulerxWorker> clazz = SchedulerxWorker.class;
        synchronized (SchedulerxWorker.class) {
            Configuration workerConf = ConfigUtil.getWorkerConfig();
            if (INITED) {
                String[] initedAppkeys = workerConf.getStringArray("appKey");
                ArrayList<String> appkeyList = Lists.newArrayList(initedAppkeys);
                String[] newGroups = appKey.split(",");
                ArrayList<String> newGroupIdList = Lists.newArrayList(newGroups);
                appkeyList.addAll(newGroupIdList);
                workerConf.setProperty("appKey", StringUtils.join(appkeyList, ","));
            } else {
                ConfigUtil.getWorkerConfig().setProperty("appKey", appKey);
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    public void setEnableBatchWork(boolean enableBatchWork) {
        ConfigUtil.getWorkerConfig().setProperty("batch.work.enable", enableBatchWork);
    }

    private static void initAgentConf(String agentConfPath) {
        String starterMode;
        Object graceShutdownTimeout;
        String graceShutdownMode;
        String h2User;
        Object mapMasterStatusCheckInterval;
        Integer httpServerPort;
        Configuration agentConf = ConfigUtil.newConfig(agentConfPath);
        if (agentConf == null) {
            LOGGER.error("load agent conf error, agentConf path:{}", agentConfPath);
            return;
        }
        Configuration workerConf = ConfigUtil.getWorkerConfig();
        workerConf.setProperty("batch.work.enable", false);
        workerConf.setProperty("share.container.pool", true);
        workerConf.setProperty("domainName", agentConf.getProperty("domainName"));
        workerConf.setProperty("groupId", agentConf.getProperty("groupId"));
        workerConf.setProperty("appKey", agentConf.getProperty("appKey"));
        workerConf.setProperty("schedulerx.namespace", agentConf.getProperty("namespace"));
        workerConf.setProperty("aliyun.accessKey", agentConf.getProperty("aliyunAccessKey"));
        workerConf.setProperty("aliyun.secretKey", agentConf.getProperty("aliyunSecretKey"));
        workerConf.setProperty("address.server.domain", agentConf.getProperty("endpoint"));
        workerConf.setProperty("address.server.port", agentConf.getProperty("endpointPort"));
        workerConf.setProperty("worker.label", agentConf.getProperty("label"));
        workerConf.setProperty("broadcast.dispatch.thread.enable", agentConf.getBoolean("broadcast.dispatch.thread.enable", false));
        workerConf.setProperty("broadcast.dispatch.thread.num", agentConf.getInt("broadcast.dispatch.thread.num", 4));
        workerConf.setProperty("grace.shutdown.timeout", agentConf.getLong("grace.shutdown.timeout", 0L));
        Boolean enableHttpServer = agentConf.getBoolean("worker.http.server.enable", null);
        if (enableHttpServer != null) {
            workerConf.setProperty("worker.http.server.enable", enableHttpServer);
        }
        if ((httpServerPort = agentConf.getInteger("worker.http.server.port", null)) != null) {
            workerConf.setProperty("worker.http.server.port", httpServerPort);
        }
        if ((mapMasterStatusCheckInterval = agentConf.getProperty("map.master.status.check.interval")) != null) {
            workerConf.setProperty("map.master.status.check.interval", mapMasterStatusCheckInterval);
        }
        if (StringUtils.isNotEmpty(h2User = agentConf.getString("h2.database.user"))) {
            workerConf.setProperty("h2.database.user", h2User);
        }
        String h2Password = agentConf.getString("h2.database.password");
        if (StringUtils.isNotEmpty(h2User)) {
            workerConf.setProperty("h2.database.password", h2Password);
        }
        if (StringUtils.isNotEmpty(graceShutdownMode = agentConf.getString("grace.shutdown.mode"))) {
            workerConf.setProperty("grace.shutdown.mode", graceShutdownMode);
        }
        if ((graceShutdownTimeout = agentConf.getProperty("grace.shutdown.timeout")) != null) {
            workerConf.setProperty("grace.shutdown.timeout", graceShutdownTimeout);
        }
        if (StringUtils.isNotEmpty(starterMode = System.getProperty("schedulerx.stater.mode"))) {
            workerConf.setProperty("stater.mode", starterMode);
            if (starterMode.equalsIgnoreCase("pod")) {
                workerConf.setProperty("cgroup.metrics.enable", true);
                LOGGER.info("starterMode={}, auto set cgroup.metrics.enable=true", starterMode);
            }
        } else {
            workerConf.setProperty("stater.mode", "agent");
        }
    }

    public static void restartActorSystem() throws Exception {
        if (actorSystem != null) {
            Future<Terminated> terminatedFuture = SchedulerxWorker.terminateActorSystem();
            LOGGER.info("actorSystem terminating...");
            terminatedFuture.onComplete(new OnComplete(){

                public void onComplete(Throwable failure, Object success) throws Throwable {
                    LOGGER.info("actorSystem terminated, ready to restart actorSystem");
                    String workerId = WorkerIdGenerator.get();
                    String host = ConfigUtil.getWorkerConfig().getString("hostname");
                    if (host == null) {
                        host = SchedulerxWorker.getLocalHost();
                    }
                    int port = ConfigUtil.getWorkerConfig().getInt("port", 0);
                    Config akkaConfig = ConfigUtil.getAkkaConfig("akka-worker.conf", host, port);
                    actorSystem = ActorSystem.create(workerId, akkaConfig);
                    forceActorSystemTerminate = false;
                    actorSystem.registerOnTermination(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                if (!forceActorSystemTerminate) {
                                    LOGGER.info("Restart Actor System.");
                                    SchedulerxWorker.restartActorSystem();
                                } else {
                                    LOGGER.info("Force terminate actor system...");
                                }
                            }
                            catch (Exception e) {
                                LOGGER.error("Restart actorSystem failed.", e);
                            }
                        }
                    });
                    SchedulerxWorker.initActors(actorSystem, workerId);
                    LOGGER.info("actors inited.");
                    Address address = actorSystem.provider().getDefaultAddress();
                    WORKER_ADDR = address.host().get() + ":" + address.port().get();
                    ConfigUtil.getWorkerConfig().setProperty("akkaPath", address.toString());
                    LOGGER.info("actor system restarted, address={}", address.toString());
                    groupManager.reset(actorSystem);
                    INITED = true;
                }
            }, actorSystem.dispatcher());
        }
    }

    public static String startActorSystem() throws Exception {
        String workerId = WorkerIdGenerator.get();
        String host = ConfigUtil.getWorkerConfig().getString("hostname");
        if (host == null) {
            host = SchedulerxWorker.getLocalHost();
        }
        int port = ConfigUtil.getWorkerConfig().getInt("port", 0);
        Config akkaConfig = ConfigUtil.getAkkaConfig("akka-worker.conf", host, port);
        actorSystem = ActorSystem.create(workerId, akkaConfig);
        forceActorSystemTerminate = false;
        actorSystem.registerOnTermination(new Runnable(){

            @Override
            public void run() {
                try {
                    if (!forceActorSystemTerminate) {
                        LOGGER.info("Restart Actor System.");
                        SchedulerxWorker.restartActorSystem();
                    } else {
                        LOGGER.info("Force terminate actor system...");
                    }
                }
                catch (Exception e) {
                    LOGGER.error("Restart actorSystem failed.", e);
                }
            }
        });
        Address address = actorSystem.provider().getDefaultAddress();
        WORKER_ADDR = address.host().get() + ":" + address.port().get();
        ConfigUtil.getWorkerConfig().setProperty("akkaPath", address.toString());
        LOGGER.info("actor system started, address={}", address.toString());
        SchedulerxWorker.initActors(actorSystem, workerId);
        LOGGER.info("actors inited.");
        return workerId;
    }

    private static Future<Terminated> terminateActorSystem() {
        if (actorSystem != null) {
            forceActorSystemTerminate = true;
            return actorSystem.terminate();
        }
        return null;
    }

    public void setEnableUnits(String units) {
        ConfigUtil.getWorkerConfig().setProperty("enable.units", units);
    }

    public void setEnableSites(String sites) {
        ConfigUtil.getWorkerConfig().setProperty("enable.sites", sites);
    }

    public void setDisableUnits(String units) {
        ConfigUtil.getWorkerConfig().setProperty("disable.units", units);
    }

    public void setDisableSites(String sites) {
        ConfigUtil.getWorkerConfig().setProperty("disable.sites", sites);
    }

    public void setAliyunAccessKey(String aliyunAccessKey) {
        ConfigUtil.getWorkerConfig().setProperty("aliyun.accessKey", aliyunAccessKey);
    }

    public void setAliyunSecretKey(String aliyunSecretKey) {
        ConfigUtil.getWorkerConfig().setProperty("aliyun.secretKey", aliyunSecretKey);
    }

    public void setSTSAccessKey(String stsAccessKey) {
        ConfigUtil.getWorkerConfig().setProperty("sts.accessKey", stsAccessKey);
    }

    public void setSTSSecretKey(String stsSecretKey) {
        ConfigUtil.getWorkerConfig().setProperty("sts.secretKey", stsSecretKey);
    }

    public void setSTSSecretToken(String stsSecretToken) {
        ConfigUtil.getWorkerConfig().setProperty("sts.token", stsSecretToken);
    }

    public void setHost(String host) {
        ConfigUtil.getWorkerConfig().setProperty("hostname", host);
    }

    public void setPort(int port) {
        ConfigUtil.getWorkerConfig().setProperty("port", port);
    }

    public void setClassLoader(ClassLoader userClassLoader) {
        CUSTOMER_CLASS_LOADER = userClassLoader;
    }

    public void setNamespace(String namespace) {
        if (StringUtils.isBlank(ConfigUtil.getWorkerConfig().getString("schedulerx.namespace"))) {
            ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace", namespace);
        }
    }

    public void setNamespaceSource(String namespaceSource) {
        ConfigUtil.getWorkerConfig().setProperty("schedulerx.namespace.source", namespaceSource);
    }

    public void setEndpoint(String endpoint) {
        if (StringUtils.isBlank((String)ConfigUtil.getWorkerConfig().getProperty("address.server.domain"))) {
            ConfigUtil.getWorkerConfig().setProperty("address.server.domain", endpoint);
        }
    }

    public void setEndpointPort(int endpointPort) {
        if (StringUtils.isBlank((String)ConfigUtil.getWorkerConfig().getProperty("address.server.port"))) {
            ConfigUtil.getWorkerConfig().setProperty("address.server.port", String.valueOf(endpointPort));
        }
    }

    public void setMaxTaskBodySize(int maxSize) {
        ConfigUtil.getWorkerConfig().setProperty("task.body.size.max", maxSize);
    }

    public void setBlockAppStart(boolean block) {
        ConfigUtil.getWorkerConfig().setProperty("block.app.start", block);
    }

    public void setShareContainerPool(boolean shareConatinerPool) {
        ConfigUtil.getWorkerConfig().setProperty("share.container.pool", shareConatinerPool);
    }

    public void setThreadPoolMode(String mode) {
        ConfigUtil.getWorkerConfig().setProperty("thread.pool.mode", mode);
    }

    public void setSharePoolSize(int sharePoolSize) {
        ConfigUtil.getWorkerConfig().setProperty("share.pool.size", sharePoolSize);
    }

    public void setSharePoolQueueSize(int queueSize) {
        ConfigUtil.getWorkerConfig().setProperty("share.pool.queue.size", queueSize);
    }

    public void setSlsCollectorEnable(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("sls.log.enable", enable);
    }

    public void setLabel(String label) {
        ConfigUtil.getWorkerConfig().setProperty("worker.label", label);
    }

    public void setLabelPath(String labelPath) {
        ConfigUtil.getWorkerConfig().setProperty("worker.label.path", labelPath);
    }

    public void setEnableCgroupMetrics(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("cgroup.metrics.enable", enable);
    }

    public void setCgroupPathPrefix(String cgroupPathPrefix) {
        ConfigUtil.getWorkerConfig().setProperty("cgroup.path.prefix", cgroupPathPrefix);
    }

    public void setAkkaRemotingAutoRecover(boolean autoRecover) {
        ConfigUtil.getWorkerConfig().setProperty("akka.remoting.auto.recover", autoRecover);
    }

    public void setEnableHeartbeatLog(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("schedulerx.heartbeat.log.enable", enable);
    }

    public void setMapMasterStatusCheckInterval(int interval) {
        ConfigUtil.getWorkerConfig().setProperty("map.master.status.check.interval", interval);
    }

    public void setEnableSecondDelayCycleIntervalMs(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("second.delay.interval.ms.enable", enable);
    }

    public void setEnableMapMasterFailover(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("map.master.failover.enable", enable);
    }

    public void setMapMasterDispatchRandom(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("map.master.dispatch.random", enable);
    }

    public void setMapMasterRouterStrategy(Integer strategy) {
        ConfigUtil.getWorkerConfig().setProperty("map.master.router.strategy", strategy);
    }

    public void setEnableSecondDelayStandaloneDispatch(boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("second_delay.standalone.dispatch", enable);
    }

    public void setPageSize(int pageSize) {
        ConfigUtil.getWorkerConfig().setProperty("worker.map.page.size", pageSize);
    }

    public void setGraceShutdownTimeout(long delay) {
        ConfigUtil.getWorkerConfig().setProperty("grace.shutdown.timeout", delay);
    }

    @Deprecated
    public void setWorkerShutdownMode(Integer mode) {
        ConfigUtil.getWorkerConfig().setProperty("worker.shutdown.mode", mode);
    }

    public void setGraceShutdownMode(String mode) {
        ConfigUtil.getWorkerConfig().setProperty("grace.shutdown.mode", mode);
    }

    public void setBroadcastDispatchThreadNum(int num) {
        ConfigUtil.getWorkerConfig().setProperty("broadcast.dispatch.thread.num", num);
    }

    public void setBroadcastDispatchThreadEnable(Boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("broadcast.dispatch.thread.enable", enable);
    }

    public void setBroadcastDispatchRetryTimes(int num) {
        ConfigUtil.getWorkerConfig().setProperty("broadcast.dispatch.retry.times", num);
    }

    public void setBroadcastMasterExecEnable(Boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("broadcast.master.exec.enable", enable);
    }

    public void setProcessorPoolSize(Map<String, Integer> processorPoolSize) {
        ConfigUtil.getWorkerConfig().setProperty("processor.thread.pool.size", processorPoolSize);
    }

    public void setH2DatabaseUser(String h2User) {
        ConfigUtil.getWorkerConfig().setProperty("h2.database.user", h2User);
    }

    public void setH2DatabasePassword(String h2Password) {
        ConfigUtil.getWorkerConfig().setProperty("h2.database.password", h2Password);
    }

    public void setHttpServerEnable(Boolean enable) {
        ConfigUtil.getWorkerConfig().setProperty("worker.http.server.enable", enable);
    }

    public void setHttpServerPort(Integer port) {
        ConfigUtil.getWorkerConfig().setProperty("worker.http.server.port", port);
    }

    public void setMaxMapDiskPercent(float diskPercent) {
        ConfigUtil.getWorkerConfig().setProperty("map.disk.percent.max", Float.valueOf(diskPercent));
    }

    public void onApplicationEvent(ApplicationContextEvent event) {
        this.elegantStartAndStop(event);
    }

    private void elegantStartAndStop(ApplicationContextEvent event) {
        if (event instanceof ContextStartedEvent) {
            LOGGER.warn("SpringApplicationContext={} started and change to {}.", SpringContext.context, event.getApplicationContext());
            SpringContext.context = event.getApplicationContext();
            this.loadProcessorBean(SpringContext.context);
            SpringContext.unlock();
            SchedulerxWorker.initHeartBeat(WORKER_ID);
            LOGGER.info("[ContextStartedEvent] heartbeat init.");
        } else if (event instanceof ContextRefreshedEvent) {
            LOGGER.warn("SpringApplicationContext={} refreshed to {}.", SpringContext.context, event.getApplicationContext());
            SpringContext.context = event.getApplicationContext();
            this.loadProcessorBean(SpringContext.context);
            SpringContext.unlock();
            SchedulerxWorker.initHeartBeat(WORKER_ID);
            LOGGER.info("[ContextRefreshedEvent] heartbeat init.");
        } else if (event instanceof ContextStoppedEvent) {
            SchedulerxWorker.shutdown(null);
            SpringContext.lock();
            LOGGER.warn("SpringApplicationContext={} stopped.", event.getApplicationContext());
        } else if (event instanceof ContextClosedEvent) {
            SchedulerxWorker.shutdown(null);
            SpringContext.lock();
            LOGGER.warn("SpringApplicationContext={} closed.", event.getApplicationContext());
        }
        LOGGER.warn("SchedulerxWorker Lock times:{}\uff0c unLock times:{}.", SpringContext.lockTimes(), SpringContext.unLockTimes());
    }

    private void loadProcessorBean(ApplicationContext context) {
        Map beanMap = context.getBeansOfType(JobProcessor.class);
        for (Map.Entry processorBean : beanMap.entrySet()) {
            JobProcessor processor = (JobProcessor)processorBean.getValue();
            if (!AopUtils.isAopProxy((Object)processor)) continue;
            SpringContext.putBeanName(AopUtils.getTargetClass((Object)processor), processor);
        }
    }
}

