/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.job;

import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.clock.SystemClock;
import alluxio.collections.IndexDefinition;
import alluxio.collections.IndexedSet;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AccessControlException;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JobCommand;
import alluxio.grpc.ListAllPOptions;
import alluxio.grpc.RegisterCommand;
import alluxio.grpc.ServiceType;
import alluxio.heartbeat.HeartbeatExecutor;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.job.CmdConfig;
import alluxio.job.JobConfig;
import alluxio.job.JobServerContext;
import alluxio.job.MasterWorkerInfo;
import alluxio.job.meta.JobIdGenerator;
import alluxio.job.plan.PlanConfig;
import alluxio.job.wire.CmdStatusBlock;
import alluxio.job.wire.JobInfo;
import alluxio.job.wire.JobServiceSummary;
import alluxio.job.wire.JobWorkerHealth;
import alluxio.job.wire.PlanInfo;
import alluxio.job.wire.Status;
import alluxio.job.wire.TaskInfo;
import alluxio.job.wire.WorkflowInfo;
import alluxio.job.workflow.WorkflowConfig;
import alluxio.master.AbstractMaster;
import alluxio.master.MasterContext;
import alluxio.master.audit.AsyncUserAccessAuditLogWriter;
import alluxio.master.job.JobMasterAuditContext;
import alluxio.master.job.JobMasterClientServiceHandler;
import alluxio.master.job.JobMasterWorkerServiceHandler;
import alluxio.master.job.command.CommandManager;
import alluxio.master.job.plan.PlanCoordinator;
import alluxio.master.job.plan.PlanTracker;
import alluxio.master.job.tracker.CmdJobTracker;
import alluxio.master.job.workflow.WorkflowTracker;
import alluxio.master.journal.NoopJournaled;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.LockResource;
import alluxio.security.authentication.AuthType;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.authentication.ClientContextServerInjector;
import alluxio.underfs.UfsManager;
import alluxio.util.CommonUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class JobMaster
extends AbstractMaster
implements NoopJournaled {
    private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
    private final IndexDefinition<MasterWorkerInfo, Long> mIdIndex = IndexDefinition.ofUnique(MasterWorkerInfo::getId);
    private final IndexDefinition<MasterWorkerInfo, WorkerNetAddress> mAddressIndex = IndexDefinition.ofUnique(MasterWorkerInfo::getWorkerAddress);
    private final JobServerContext mJobServerContext;
    @GuardedBy(value="mWorkerRWLock")
    private final IndexedSet<MasterWorkerInfo> mWorkers = new IndexedSet(this.mIdIndex, new IndexDefinition[]{this.mAddressIndex});
    private final ConcurrentHashMap<Long, JobWorkerHealth> mWorkerHealth;
    private final ReentrantReadWriteLock mWorkerRWLock = new ReentrantReadWriteLock(true);
    private final AtomicLong mNextWorkerId = new AtomicLong(CommonUtils.getCurrentMs());
    private final CommandManager mCommandManager;
    private final PlanTracker mPlanTracker;
    private final WorkflowTracker mWorkflowTracker;
    private final JobIdGenerator mJobIdGenerator;
    private AsyncUserAccessAuditLogWriter mAsyncAuditLogWriter;
    private final CmdJobTracker mCmdJobTracker;

    public JobMaster(MasterContext masterContext, FileSystem filesystem, FileSystemContext fsContext, UfsManager ufsManager) {
        super(masterContext, (Clock)new SystemClock(), ExecutorServiceFactories.cachedThreadPool((String)"JobMaster"));
        this.mJobServerContext = new JobServerContext(filesystem, fsContext, ufsManager);
        this.mCommandManager = new CommandManager();
        this.mJobIdGenerator = new JobIdGenerator();
        this.mWorkflowTracker = new WorkflowTracker(this);
        this.mPlanTracker = new PlanTracker(Configuration.getLong((PropertyKey)PropertyKey.JOB_MASTER_JOB_CAPACITY), Configuration.getMs((PropertyKey)PropertyKey.JOB_MASTER_FINISHED_JOB_RETENTION_TIME), Configuration.getLong((PropertyKey)PropertyKey.JOB_MASTER_FINISHED_JOB_PURGE_COUNT), this.mWorkflowTracker);
        this.mWorkerHealth = new ConcurrentHashMap();
        this.mCmdJobTracker = new CmdJobTracker(fsContext, this);
        MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_JOB_COUNT.getName(), () -> MetricsSystem.counter((String)MetricKey.MASTER_JOB_CANCELED.getName()).getCount() + MetricsSystem.counter((String)MetricKey.MASTER_JOB_COMPLETED.getName()).getCount() + MetricsSystem.counter((String)MetricKey.MASTER_JOB_CREATED.getName()).getCount() + MetricsSystem.counter((String)MetricKey.MASTER_JOB_FAILED.getName()).getCount() + MetricsSystem.counter((String)MetricKey.MASTER_JOB_RUNNING.getName()).getCount());
    }

    public long getNewJobId() {
        return this.mJobIdGenerator.getNewJobId();
    }

    public void start(Boolean isLeader) throws IOException {
        super.start(isLeader);
        MetricsSystem.startSinks((String)Configuration.getString((PropertyKey)PropertyKey.METRICS_CONF_FILE));
        for (PlanCoordinator planCoordinator : this.mPlanTracker.coordinators()) {
            if (planCoordinator.isJobFinished()) continue;
            planCoordinator.setJobAsFailed("JobMasterShutdown", "Job failed: Job master shut down during execution");
        }
        if (isLeader.booleanValue()) {
            this.getExecutorService().submit((Runnable)new HeartbeatThread("Job Master Lost Worker Detection", (HeartbeatExecutor)new LostWorkerDetectionHeartbeatExecutor(), () -> Configuration.getMs((PropertyKey)PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL), Configuration.global(), this.mMasterContext.getUserState()));
            if (Configuration.getBoolean((PropertyKey)PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) {
                this.mAsyncAuditLogWriter = new AsyncUserAccessAuditLogWriter("JOB_MASTER_AUDIT_LOG");
                this.mAsyncAuditLogWriter.start();
                MetricsSystem.registerGaugeIfAbsent((String)MetricKey.MASTER_AUDIT_LOG_ENTRIES_SIZE.getName(), () -> this.mAsyncAuditLogWriter != null ? this.mAsyncAuditLogWriter.getAuditLogEntriesSize() : -1L);
            }
        }
    }

    public void stop() throws IOException {
        if (this.mAsyncAuditLogWriter != null) {
            this.mAsyncAuditLogWriter.stop();
            this.mAsyncAuditLogWriter = null;
        }
        super.stop();
    }

    public Map<ServiceType, GrpcService> getServices() {
        HashMap services = Maps.newHashMap();
        services.put(ServiceType.JOB_MASTER_CLIENT_SERVICE, new GrpcService(ServerInterceptors.intercept((BindableService)new JobMasterClientServiceHandler(this), (ServerInterceptor[])new ServerInterceptor[]{new ClientContextServerInjector()})));
        services.put(ServiceType.JOB_MASTER_WORKER_SERVICE, new GrpcService((BindableService)new JobMasterWorkerServiceHandler(this)));
        return services;
    }

    public String getName() {
        return "JobMaster";
    }

    public synchronized long run(JobConfig jobConfig) throws JobDoesNotExistException, ResourceExhaustedException {
        long jobId = this.getNewJobId();
        this.run(jobConfig, jobId);
        return jobId;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public synchronized void run(JobConfig jobConfig, long jobId) throws JobDoesNotExistException, ResourceExhaustedException {
        Context forkedCtx = Context.current().fork();
        Context prevCtx = forkedCtx.attach();
        try (JobMasterAuditContext auditContext = this.createAuditContext("run");){
            auditContext.setJobId(jobId);
            auditContext.setJobName(jobConfig.getName());
            if (jobConfig instanceof PlanConfig) {
                this.mPlanTracker.run((PlanConfig)jobConfig, this.mCommandManager, this.mJobServerContext, this.getWorkerInfoList(), jobId);
                auditContext.setSucceeded(true);
                return;
            }
            if (jobConfig instanceof WorkflowConfig) {
                this.mWorkflowTracker.run((WorkflowConfig)jobConfig, jobId);
                auditContext.setSucceeded(true);
                return;
            }
            throw new JobDoesNotExistException(ExceptionMessage.JOB_DEFINITION_DOES_NOT_EXIST.getMessage(new Object[]{jobConfig.getName()}));
        }
        finally {
            forkedCtx.detach(prevCtx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized long submit(CmdConfig cmdConfig) throws JobDoesNotExistException, IOException {
        long jobControlId = this.getNewJobId();
        Context forkedCtx = Context.current().fork();
        Context prevCtx = forkedCtx.attach();
        try (JobMasterAuditContext auditContext = this.createAuditContext("run");){
            auditContext.setJobId(jobControlId);
            this.mCmdJobTracker.run(cmdConfig, jobControlId);
        }
        finally {
            forkedCtx.detach(prevCtx);
        }
        return jobControlId;
    }

    public void cancel(long jobId) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("cancel");){
            auditContext.setJobId(jobId);
            PlanCoordinator planCoordinator = this.mPlanTracker.getCoordinator(jobId);
            if (planCoordinator == null) {
                if (!this.mWorkflowTracker.cancel(jobId)) {
                    throw new JobDoesNotExistException(jobId);
                }
                return;
            }
            planCoordinator.cancel();
            auditContext.setSucceeded(true);
        }
    }

    public Status getCmdStatus(long jobControlId) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getCmdStatus");){
            auditContext.setJobId(jobControlId);
            Status status = this.mCmdJobTracker.getCmdStatus(jobControlId);
            return status;
        }
    }

    public List<Long> list(ListAllPOptions options) {
        try (JobMasterAuditContext auditContext = this.createAuditContext("list");){
            ArrayList<Long> ids = new ArrayList<Long>();
            ids.addAll(this.mPlanTracker.findJobs(options.getName(), options.getStatusList().stream().map(status -> Status.valueOf((String)status.name())).collect(Collectors.toList())));
            ids.addAll(this.mWorkflowTracker.findJobs(options.getName(), options.getStatusList().stream().map(status -> Status.valueOf((String)status.name())).collect(Collectors.toList())));
            Collections.sort(ids);
            auditContext.setSucceeded(true);
            ArrayList<Long> arrayList = ids;
            return arrayList;
        }
    }

    public List<Long> listCmds(ListAllPOptions options) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("listCmds");){
            ArrayList<Long> ids = new ArrayList<Long>();
            ids.addAll(this.mCmdJobTracker.findCmdIds(options.getStatusList().stream().map(status -> Status.valueOf((String)status.name())).collect(Collectors.toList())));
            Collections.sort(ids);
            auditContext.setSucceeded(true);
            ArrayList<Long> arrayList = ids;
            return arrayList;
        }
    }

    public CmdStatusBlock getCmdStatusDetailed(long jobControlId) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getCmdStatusDetailed");){
            CmdStatusBlock cmdStatusBlock = this.mCmdJobTracker.getCmdStatusBlock(jobControlId);
            return cmdStatusBlock;
        }
    }

    public Set<String> getAllFailedPaths() {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getAllFailedPaths");){
            HashSet<String> ids = new HashSet<String>();
            ids.addAll(this.mCmdJobTracker.findAllFailedPaths());
            auditContext.setSucceeded(true);
            HashSet<String> hashSet = ids;
            return hashSet;
        }
    }

    public Set<String> getFailedPaths(long jobControlId) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getFailedPaths");){
            HashSet<String> ids = new HashSet<String>();
            ids.addAll(this.mCmdJobTracker.findFailedPaths(jobControlId));
            auditContext.setSucceeded(true);
            HashSet<String> hashSet = ids;
            return hashSet;
        }
    }

    public List<JobInfo> listDetailed() {
        try (JobMasterAuditContext auditContext = this.createAuditContext("listDetailed");){
            ArrayList<Object> jobInfos = new ArrayList<Object>();
            for (PlanCoordinator coordinator : this.mPlanTracker.coordinators()) {
                jobInfos.add(coordinator.getPlanInfoWire(false));
            }
            jobInfos.addAll(this.mWorkflowTracker.getAllInfo());
            jobInfos.sort(Comparator.comparingLong(JobInfo::getId));
            auditContext.setSucceeded(true);
            ArrayList<Object> arrayList = jobInfos;
            return arrayList;
        }
    }

    public List<JobInfo> failed(int limit, long before, long after) {
        ArrayList<JobInfo> jobInfos = new ArrayList<JobInfo>();
        this.mPlanTracker.failed().filter(planInfoMeta -> {
            long lastStatusChangeMs = planInfoMeta.getLastStatusChangeMs();
            if (before >= 0L && before <= lastStatusChangeMs) {
                return false;
            }
            return after < lastStatusChangeMs;
        }).filter(planInfoMeta -> planInfoMeta.getLastStatusChangeMs() > after).limit(limit).forEachOrdered(planInfoMeta -> jobInfos.add((JobInfo)new PlanInfo(planInfoMeta, false)));
        return jobInfos;
    }

    public JobInfo getStatus(long jobId) throws JobDoesNotExistException {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getStatus");){
            auditContext.setJobId(jobId);
            JobInfo jobInfo = this.getStatus(jobId, true);
            if (jobInfo != null) {
                auditContext.setJobName(jobInfo.getName());
                auditContext.setSucceeded(true);
            }
            JobInfo jobInfo2 = jobInfo;
            return jobInfo2;
        }
    }

    public JobInfo getStatus(long jobId, boolean verbose) throws JobDoesNotExistException {
        PlanCoordinator planCoordinator = this.mPlanTracker.getCoordinator(jobId);
        if (planCoordinator == null) {
            WorkflowInfo status = this.mWorkflowTracker.getStatus(jobId, verbose);
            if (status == null) {
                throw new JobDoesNotExistException(jobId);
            }
            return status;
        }
        return planCoordinator.getPlanInfoWire(verbose);
    }

    public JobServiceSummary getSummary() {
        return new JobServiceSummary(this.listDetailed());
    }

    public List<JobWorkerHealth> getAllWorkerHealth() {
        try (JobMasterAuditContext auditContext = this.createAuditContext("getAllWorkerHealth");){
            ArrayList result = Lists.newArrayList(this.mWorkerHealth.values());
            result.sort(Comparator.comparingLong(JobWorkerHealth::getWorkerId));
            auditContext.setSucceeded(true);
            ArrayList arrayList = result;
            return arrayList;
        }
    }

    public long registerWorker(WorkerNetAddress workerNetAddress) {
        try (LockResource workersLockExclusive = new LockResource((Lock)this.mWorkerRWLock.writeLock());){
            if (this.mWorkers.contains(this.mAddressIndex, (Object)workerNetAddress)) {
                LOG.info("Worker at address {} is re-registering. Failing tasks for previous worker at that address", (Object)workerNetAddress);
                MasterWorkerInfo deadWorker = (MasterWorkerInfo)this.mWorkers.getFirstByField(this.mAddressIndex, (Object)workerNetAddress);
                for (PlanCoordinator planCoordinator : this.mPlanTracker.coordinators()) {
                    planCoordinator.failTasksForWorker(deadWorker.getId());
                }
                this.mWorkerHealth.remove(deadWorker.getId());
                this.mWorkers.remove((Object)deadWorker);
            }
            long workerId = this.mNextWorkerId.getAndIncrement();
            this.mWorkers.add((Object)new MasterWorkerInfo(workerId, workerNetAddress));
            LOG.info("registerWorker(): WorkerNetAddress: {} id: {}", (Object)workerNetAddress, (Object)workerId);
            long l = workerId;
            return l;
        }
    }

    public List<WorkerInfo> getWorkerInfoList() {
        ArrayList<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>(this.mWorkers.size());
        try (LockResource workersLockShared = new LockResource((Lock)this.mWorkerRWLock.readLock());){
            for (MasterWorkerInfo masterWorkerInfo : this.mWorkers) {
                workerInfoList.add(masterWorkerInfo.generateClientWorkerInfo());
            }
        }
        return workerInfoList;
    }

    public void setTaskPoolSize(int taskPoolSize) {
        try (LockResource workersLockShared = new LockResource((Lock)this.mWorkerRWLock.readLock());){
            for (MasterWorkerInfo worker : this.mWorkers) {
                this.mCommandManager.submitSetTaskPoolSizeCommand(worker.getId(), taskPoolSize);
            }
        }
    }

    public List<JobCommand> workerHeartbeat(JobWorkerHealth jobWorkerHealth, List<TaskInfo> taskInfoList) {
        String hostname;
        long workerId = jobWorkerHealth.getWorkerId();
        LockResource workersLockShared = new LockResource((Lock)this.mWorkerRWLock.readLock());
        Iterator iterator = null;
        try {
            MasterWorkerInfo worker = (MasterWorkerInfo)this.mWorkers.getFirstByField(this.mIdIndex, (Object)workerId);
            if (worker == null) {
                List<JobCommand> list = Collections.singletonList(JobCommand.newBuilder().setRegisterCommand(RegisterCommand.getDefaultInstance()).build());
                return list;
            }
            hostname = worker.getWorkerAddress().getHost();
            worker.updateLastUpdatedTimeMs();
        }
        catch (Throwable worker) {
            iterator = worker;
            throw worker;
        }
        finally {
            if (workersLockShared != null) {
                if (iterator != null) {
                    try {
                        workersLockShared.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)((Object)iterator)).addSuppressed(throwable);
                    }
                } else {
                    workersLockShared.close();
                }
            }
        }
        this.mWorkerHealth.put(workerId, jobWorkerHealth);
        HashMap taskInfosPerJob = new HashMap();
        for (TaskInfo taskInfo : taskInfoList) {
            taskInfo.setWorkerHost(hostname);
            if (!taskInfosPerJob.containsKey(taskInfo.getJobId())) {
                taskInfosPerJob.put(taskInfo.getJobId(), new ArrayList());
            }
            ((List)taskInfosPerJob.get(taskInfo.getJobId())).add(taskInfo);
        }
        for (Map.Entry taskInfosPair : taskInfosPerJob.entrySet()) {
            PlanCoordinator planCoordinator = this.mPlanTracker.getCoordinator((Long)taskInfosPair.getKey());
            if (planCoordinator == null) continue;
            planCoordinator.updateTasks((List)taskInfosPair.getValue());
        }
        return this.mCommandManager.pollAllPendingCommands(workerId);
    }

    private JobMasterAuditContext createAuditContext(String command) {
        AsyncUserAccessAuditLogWriter auditLogWriter = null;
        if (Configuration.getBoolean((PropertyKey)PropertyKey.MASTER_AUDIT_LOGGING_ENABLED)) {
            auditLogWriter = this.mAsyncAuditLogWriter;
        }
        JobMasterAuditContext auditContext = new JobMasterAuditContext(auditLogWriter);
        if (auditLogWriter != null) {
            String user = null;
            String ugi = "";
            try {
                user = AuthenticatedClientUser.getClientUser((AlluxioConfiguration)Configuration.global());
            }
            catch (AccessControlException e) {
                ugi = "N/A";
            }
            if (user != null) {
                try {
                    String primaryGroup = CommonUtils.getPrimaryGroupName((String)user, (AlluxioConfiguration)Configuration.global());
                    ugi = user + "," + primaryGroup;
                }
                catch (IOException e) {
                    LOG.debug("Failed to get primary group for user {}.", (Object)user);
                    ugi = user + ",N/A";
                }
            }
            AuthType authType = (AuthType)Configuration.getEnum((PropertyKey)PropertyKey.SECURITY_AUTHENTICATION_TYPE, AuthType.class);
            auditContext.setUgi(ugi).setAuthType(authType).setIp(ClientContextServerInjector.getIpAddress()).setClientVersion(ClientContextServerInjector.getClientVersion()).setCommand(command).setAllowed(true).setCreationTimeNs(System.nanoTime());
        }
        return auditContext;
    }

    private final class LostWorkerDetectionHeartbeatExecutor
    implements HeartbeatExecutor {
        public void heartbeat() {
            long lastUpdate;
            int masterWorkerTimeoutMs = (int)Configuration.getMs((PropertyKey)PropertyKey.JOB_MASTER_WORKER_TIMEOUT);
            ArrayList<MasterWorkerInfo> lostWorkers = new ArrayList<MasterWorkerInfo>();
            try (LockResource workersLockShared = new LockResource((Lock)JobMaster.this.mWorkerRWLock.readLock());){
                for (MasterWorkerInfo worker : JobMaster.this.mWorkers) {
                    lastUpdate = JobMaster.this.mClock.millis() - worker.getLastUpdatedTimeMs();
                    if (lastUpdate <= (long)masterWorkerTimeoutMs) continue;
                    LOG.warn("The worker {} timed out after {}ms without a heartbeat!", (Object)worker, (Object)lastUpdate);
                    lostWorkers.add(worker);
                    for (PlanCoordinator planCoordinator : JobMaster.this.mPlanTracker.coordinators()) {
                        planCoordinator.failTasksForWorker(worker.getId());
                    }
                }
            }
            if (!lostWorkers.isEmpty()) {
                var4_4 = null;
                try (LockResource workersLockExclusive = new LockResource((Lock)JobMaster.this.mWorkerRWLock.writeLock());){
                    for (MasterWorkerInfo lostWorker : lostWorkers) {
                        lastUpdate = JobMaster.this.mClock.millis() - lostWorker.getLastUpdatedTimeMs();
                        if (lastUpdate <= (long)masterWorkerTimeoutMs) continue;
                        JobMaster.this.mWorkerHealth.remove(lostWorker.getId());
                        JobMaster.this.mWorkers.remove((Object)lostWorker);
                    }
                }
                catch (Throwable throwable) {
                    var4_4 = throwable;
                    throw throwable;
                }
            }
        }

        public void close() {
        }
    }
}

