/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.job.plan;

import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.job.JobConfig;
import alluxio.job.JobServerContext;
import alluxio.job.plan.PlanConfig;
import alluxio.job.plan.meta.PlanInfo;
import alluxio.job.wire.Status;
import alluxio.master.job.command.CommandManager;
import alluxio.master.job.plan.PlanCoordinator;
import alluxio.master.job.workflow.WorkflowTracker;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class PlanTracker {
    private static final Logger LOG = LoggerFactory.getLogger(PlanTracker.class);
    private final long mMaxJobPurgeCount;
    private final long mCapacity;
    private final long mRetentionMs;
    private final ConcurrentHashMap<Long, PlanCoordinator> mCoordinators;
    private final SortedSet<PlanInfo> mFailed;
    private final LinkedBlockingQueue<PlanInfo> mFinished;
    private final WorkflowTracker mWorkflowTracker;

    public PlanTracker(long capacity, long retentionMs, long maxJobPurgeCount, WorkflowTracker workflowTracker) {
        Preconditions.checkArgument((capacity >= 0L && capacity <= Integer.MAX_VALUE ? 1 : 0) != 0);
        this.mCapacity = capacity;
        Preconditions.checkArgument((retentionMs >= 0L ? 1 : 0) != 0);
        this.mRetentionMs = retentionMs;
        this.mMaxJobPurgeCount = maxJobPurgeCount <= 0L ? Long.MAX_VALUE : maxJobPurgeCount;
        this.mCoordinators = new ConcurrentHashMap(0, 0.95f, Math.max(8, 2 * Runtime.getRuntime().availableProcessors()));
        this.mFailed = Collections.synchronizedSortedSet(new TreeSet((left, right) -> {
            long diffTime = right.getLastStatusChangeMs() - left.getLastStatusChangeMs();
            if (diffTime != 0L) {
                return Long.signum(diffTime);
            }
            return Long.signum(right.getId() - left.getId());
        }));
        this.mFinished = new LinkedBlockingQueue();
        this.mWorkflowTracker = workflowTracker;
    }

    private void statusChangeCallback(PlanInfo planInfo) {
        if (planInfo == null) {
            return;
        }
        this.mWorkflowTracker.onPlanStatusChange(planInfo);
        Status status = planInfo.getStatus();
        if (!status.isFinished()) {
            return;
        }
        if (status.equals((Object)Status.FAILED)) {
            while ((long)this.mFailed.size() >= this.mCapacity) {
                this.mFailed.remove(this.mFailed.last());
            }
            this.mFailed.add(planInfo);
        }
        for (int i = 0; i < 2; ++i) {
            if (this.mFinished.offer(planInfo)) {
                return;
            }
            if (this.removeFinished()) continue;
            LOG.warn("Failed to remove any jobs from the finished queue in status change callback");
        }
        if (this.mFinished.offer(planInfo)) {
            return;
        }
        LOG.warn("Failed to offer job id {} to finished queue, removing from tracking preemptively", (Object)planInfo.getId());
    }

    @Nullable
    public PlanCoordinator getCoordinator(long jobId) {
        return this.mCoordinators.get(jobId);
    }

    public synchronized void run(PlanConfig jobConfig, CommandManager manager, JobServerContext ctx, List<WorkerInfo> workers, long jobId) throws JobDoesNotExistException, ResourceExhaustedException {
        if (!this.removeFinished()) {
            throw new ResourceExhaustedException(ExceptionMessage.JOB_MASTER_FULL_CAPACITY.getMessage(new Object[]{this.mCapacity}));
        }
        PlanCoordinator planCoordinator = PlanCoordinator.create(manager, ctx, workers, jobId, (JobConfig)jobConfig, this::statusChangeCallback);
        this.mCoordinators.put(jobId, planCoordinator);
    }

    private synchronized boolean removeFinished() {
        long timeSinceCompletion;
        PlanInfo oldestJob;
        boolean isFull;
        boolean removedJob = false;
        boolean bl = isFull = (long)this.mCoordinators.size() >= this.mCapacity;
        if (!isFull) {
            return true;
        }
        if (this.mFinished.isEmpty()) {
            return false;
        }
        ArrayList removedJobIds = Lists.newArrayList();
        int removeCount = 0;
        while (!this.mFinished.isEmpty() && (long)removeCount < this.mMaxJobPurgeCount && (oldestJob = this.mFinished.peek()) != null && (timeSinceCompletion = CommonUtils.getCurrentMs() - oldestJob.getLastStatusChangeMs()) >= this.mRetentionMs) {
            if (this.mFinished.poll() == null) {
                LOG.warn("Polling the queue resulted in a null element");
                break;
            }
            long oldestJobId = oldestJob.getId();
            removedJobIds.add(oldestJobId);
            if (this.mCoordinators.get(oldestJobId) == null) {
                LOG.warn("Did not find a coordinator with id {}", (Object)oldestJobId);
                continue;
            }
            removedJob = true;
            ++removeCount;
        }
        this.mWorkflowTracker.cleanup(removedJobIds);
        Iterator iterator = removedJobIds.iterator();
        while (iterator.hasNext()) {
            long removedJobId = (Long)iterator.next();
            this.mCoordinators.remove(removedJobId);
        }
        return removedJob;
    }

    public Collection<Long> list() {
        return Collections.unmodifiableCollection(this.mCoordinators.keySet());
    }

    public Collection<PlanCoordinator> coordinators() {
        return Collections.unmodifiableCollection(this.mCoordinators.values());
    }

    public Stream<PlanInfo> failed() {
        return this.mFailed.stream();
    }

    public Set<Long> findJobs(String name, List<Status> statusList) {
        return this.mCoordinators.entrySet().stream().filter(x -> statusList.isEmpty() || statusList.contains(((PlanCoordinator)x.getValue()).getPlanInfoWire(false).getStatus()) && (name == null || name.isEmpty() || ((PlanCoordinator)x.getValue()).getPlanInfoWire(false).getName().equals(name))).map(Map.Entry::getKey).collect(Collectors.toSet());
    }
}

