/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.registry;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import javax.annotation.PostConstruct;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerConnectStrategy;
import org.apache.dolphinscheduler.server.worker.registry.WorkerConnectionStateListener;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

@Service
public class WorkerRegistryClient
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClient.class);
    @Autowired
    private WorkerConfig workerConfig;
    @Autowired
    private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
    @Autowired
    private RegistryClient registryClient;
    @Autowired
    @Lazy
    private WorkerConnectStrategy workerConnectStrategy;
    @Autowired
    private MetricsProvider metricsProvider;
    private WorkerHeartBeatTask workerHeartBeatTask;

    @PostConstruct
    public void initWorkRegistry() {
        this.workerHeartBeatTask = new WorkerHeartBeatTask(this.workerConfig, this.metricsProvider, this.registryClient, this.workerTaskExecutorThreadPool);
    }

    public void start() {
        try {
            this.registry();
            this.registryClient.addConnectionStateListener((ConnectionListener)new WorkerConnectionStateListener(this.workerConfig, this.workerConnectStrategy));
        }
        catch (Exception ex) {
            throw new RegistryException("Worker registry client start up error", (Throwable)ex);
        }
    }

    private void registry() throws InterruptedException {
        WorkerHeartBeat workerHeartBeat = this.workerHeartBeatTask.getHeartBeat();
        while (ServerStatus.BUSY.equals((Object)workerHeartBeat.getServerStatus())) {
            log.warn("Worker node is BUSY: {}", (Object)workerHeartBeat);
            workerHeartBeat = this.workerHeartBeatTask.getHeartBeat();
            Thread.sleep(1000L);
        }
        String workerZKPath = this.workerConfig.getWorkerRegistryPath();
        this.registryClient.remove(workerZKPath);
        this.registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString((Object)workerHeartBeat));
        log.info("Worker node: {} registry to ZK {} successfully", (Object)this.workerConfig.getWorkerAddress(), (Object)workerZKPath);
        while (!this.registryClient.checkNodeExists(this.workerConfig.getWorkerAddress(), RegistryNodeType.WORKER)) {
            ThreadUtils.sleep((long)1000L);
        }
        ThreadUtils.sleep((long)1000L);
        this.workerHeartBeatTask.start();
        log.info("Worker node: {} registry finished", (Object)this.workerConfig.getWorkerAddress());
    }

    public Optional<Host> getAlertServerAddress() {
        List serverList = this.registryClient.getServerList(RegistryNodeType.ALERT_SERVER);
        if (CollectionUtils.isEmpty((Collection)serverList)) {
            return Optional.empty();
        }
        Server server = (Server)serverList.get(0);
        return Optional.of(new Host(server.getHost(), server.getPort()));
    }

    public void setRegistryStoppable(IStoppable stoppable) {
        this.registryClient.setStoppable(stoppable);
    }

    @Override
    public void close() throws IOException {
        if (this.workerHeartBeatTask != null) {
            this.workerHeartBeatTask.shutdown();
        }
        this.registryClient.close();
        log.info("Worker registry client closed");
    }
}

