/*
 * Decompiled with CFR 0.152.
 */
package alluxio.worker.job.command;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.ConnectionFailedException;
import alluxio.grpc.CancelTaskCommand;
import alluxio.grpc.JobCommand;
import alluxio.grpc.JobInfo;
import alluxio.grpc.RunTaskCommand;
import alluxio.grpc.SetTaskPoolSizeCommand;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.job.JobServerContext;
import alluxio.job.RunTaskContext;
import alluxio.job.wire.JobWorkerHealth;
import alluxio.job.wire.TaskInfo;
import alluxio.util.ThreadFactoryUtils;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.JobWorkerIdRegistry;
import alluxio.worker.job.JobMasterClient;
import alluxio.worker.job.command.JobWorkerHealthReporter;
import alluxio.worker.job.task.TaskExecutorManager;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class CommandHandlingExecutor
implements HeartbeatExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(CommandHandlingExecutor.class);
    private final JobServerContext mServerContext;
    private final JobMasterClient mMasterClient;
    private final TaskExecutorManager mTaskExecutorManager;
    private final WorkerNetAddress mWorkerNetAddress;
    private final JobWorkerHealthReporter mHealthReporter;
    private final boolean mIsThrottleWorkerOnPoorHealth;
    private final ExecutorService mCommandHandlingService = Executors.newSingleThreadExecutor(ThreadFactoryUtils.build((String)"command-handling-service", (boolean)true));

    public CommandHandlingExecutor(JobServerContext jobServerContext, TaskExecutorManager taskExecutorManager, JobMasterClient masterClient, WorkerNetAddress workerNetAddress) {
        this.mServerContext = (JobServerContext)Preconditions.checkNotNull((Object)jobServerContext);
        this.mTaskExecutorManager = (TaskExecutorManager)Preconditions.checkNotNull((Object)taskExecutorManager, (Object)"taskExecutorManager");
        this.mMasterClient = (JobMasterClient)Preconditions.checkNotNull((Object)masterClient, (Object)"masterClient");
        this.mWorkerNetAddress = (WorkerNetAddress)Preconditions.checkNotNull((Object)workerNetAddress, (Object)"workerNetAddress");
        this.mIsThrottleWorkerOnPoorHealth = Configuration.getBoolean((PropertyKey)PropertyKey.JOB_WORKER_THROTTLING);
        this.mHealthReporter = new JobWorkerHealthReporter(this.mWorkerNetAddress);
    }

    public void heartbeat() {
        List<JobCommand> commands;
        JobWorkerHealthReporter.JobWorkerHealthReport jobWorkerHealthReport = this.mHealthReporter.getJobWorkerHealthReport();
        if (this.mIsThrottleWorkerOnPoorHealth) {
            if (jobWorkerHealthReport.isHealthy()) {
                this.mTaskExecutorManager.unthrottle();
            } else {
                this.mTaskExecutorManager.throttle();
                LOG.warn("Worker,{}, is throttled.", (Object)this.mWorkerNetAddress.getHost());
            }
        }
        JobWorkerHealth jobWorkerHealth = new JobWorkerHealth(JobWorkerIdRegistry.getWorkerId().longValue(), jobWorkerHealthReport.getCpuLoadAverage(), this.mTaskExecutorManager.getTaskExecutorPoolSize(), this.mTaskExecutorManager.getNumActiveTasks(), this.mTaskExecutorManager.unfinishedTasks(), this.mWorkerNetAddress.getHost());
        List<TaskInfo> taskStatusList = this.mTaskExecutorManager.getAndClearTaskUpdates();
        List<JobInfo> taskProtoList = taskStatusList.stream().map(TaskInfo::toProto).collect(Collectors.toList());
        try {
            commands = this.mMasterClient.heartbeat(jobWorkerHealth, taskProtoList);
        }
        catch (AlluxioException | IOException e) {
            this.mTaskExecutorManager.restoreTaskUpdates(taskStatusList);
            LOG.error("Failed to heartbeat", e);
            return;
        }
        for (JobCommand command : commands) {
            this.mCommandHandlingService.execute(new CommandHandler(command));
        }
    }

    public void close() {
    }

    class CommandHandler
    implements Runnable {
        private final JobCommand mCommand;

        CommandHandler(JobCommand command) {
            this.mCommand = command;
        }

        @Override
        public void run() {
            if (this.mCommand.hasRunTaskCommand()) {
                RunTaskCommand command = this.mCommand.getRunTaskCommand();
                long jobId = command.getJobId();
                long taskId = command.getTaskId();
                RunTaskContext context = new RunTaskContext(jobId, taskId, CommandHandlingExecutor.this.mServerContext);
                LOG.info("Received run task " + taskId + " for job " + jobId + " on worker " + JobWorkerIdRegistry.getWorkerId());
                CommandHandlingExecutor.this.mTaskExecutorManager.executeTask(jobId, taskId, command, context);
            } else if (this.mCommand.hasCancelTaskCommand()) {
                CancelTaskCommand command = this.mCommand.getCancelTaskCommand();
                long jobId = command.getJobId();
                long taskId = command.getTaskId();
                CommandHandlingExecutor.this.mTaskExecutorManager.cancelTask(jobId, taskId);
            } else if (this.mCommand.hasRegisterCommand()) {
                try {
                    JobWorkerIdRegistry.registerWorker(CommandHandlingExecutor.this.mMasterClient, CommandHandlingExecutor.this.mWorkerNetAddress);
                }
                catch (ConnectionFailedException | IOException e) {
                    Throwables.throwIfUnchecked((Throwable)e);
                    throw new RuntimeException(e);
                }
            } else if (this.mCommand.hasSetTaskPoolSizeCommand()) {
                SetTaskPoolSizeCommand command = this.mCommand.getSetTaskPoolSizeCommand();
                LOG.info(String.format("Task Pool Size: %s", command.getTaskPoolSize()));
                CommandHandlingExecutor.this.mTaskExecutorManager.setDefaultTaskExecutorPoolSize(command.getTaskPoolSize());
            } else {
                throw new RuntimeException("unsupported command type:" + this.mCommand);
            }
        }
    }
}

