/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.rm.preemption;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointAMPreemptionPolicy
implements AMPreemptionPolicy {
    private final Set<TaskAttemptId> toBePreempted;
    private final Set<TaskAttemptId> countedPreemptions;
    private final Map<TaskId, TaskCheckpointID> checkpoints;
    private final Map<TaskAttemptId, Resource> pendingFlexiblePreemptions;
    private EventHandler eventHandler;
    static final Logger LOG = LoggerFactory.getLogger(CheckpointAMPreemptionPolicy.class);

    public CheckpointAMPreemptionPolicy() {
        this(Collections.synchronizedSet(new HashSet()), Collections.synchronizedSet(new HashSet()), Collections.synchronizedMap(new HashMap()), Collections.synchronizedMap(new HashMap()));
    }

    CheckpointAMPreemptionPolicy(Set<TaskAttemptId> toBePreempted, Set<TaskAttemptId> countedPreemptions, Map<TaskId, TaskCheckpointID> checkpoints, Map<TaskAttemptId, Resource> pendingFlexiblePreemptions) {
        this.toBePreempted = toBePreempted;
        this.countedPreemptions = countedPreemptions;
        this.checkpoints = checkpoints;
        this.pendingFlexiblePreemptions = pendingFlexiblePreemptions;
    }

    @Override
    public void init(AppContext context) {
        this.eventHandler = context.getEventHandler();
    }

    @Override
    public void preempt(AMPreemptionPolicy.Context ctxt, PreemptionMessage preemptionRequests) {
        if (preemptionRequests != null) {
            PreemptionContract cNegot;
            StrictPreemptionContract cStrict = preemptionRequests.getStrictContract();
            if (cStrict != null && cStrict.getContainers() != null && cStrict.getContainers().size() > 0) {
                LOG.info("strict preemption :" + preemptionRequests.getStrictContract().getContainers().size() + " containers to kill");
                for (PreemptionContainer c : preemptionRequests.getStrictContract().getContainers()) {
                    ContainerId reqCont = c.getId();
                    TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont);
                    if (reqTask == null) continue;
                    if (TaskType.REDUCE.equals((Object)reqTask.getTaskId().getTaskType())) {
                        this.toBePreempted.add(reqTask);
                        LOG.info("preempting " + reqCont + " running task:" + reqTask);
                        continue;
                    }
                    LOG.info("NOT preempting " + reqCont + " running task:" + reqTask);
                }
            }
            if ((cNegot = preemptionRequests.getContract()) != null && cNegot.getResourceRequest() != null && cNegot.getResourceRequest().size() > 0 && cNegot.getContainers() != null && cNegot.getContainers().size() > 0) {
                LOG.info("negotiable preemption :" + preemptionRequests.getContract().getResourceRequest().size() + " resourceReq, " + preemptionRequests.getContract().getContainers().size() + " containers");
                List reqResources = preemptionRequests.getContract().getResourceRequest();
                int pendingPreemptionRam = 0;
                int pendingPreemptionCores = 0;
                for (Resource r : this.pendingFlexiblePreemptions.values()) {
                    pendingPreemptionRam = (int)((long)pendingPreemptionRam + r.getMemorySize());
                    pendingPreemptionCores += r.getVirtualCores();
                }
                block2: for (PreemptionResourceRequest rr : reqResources) {
                    ResourceRequest reqRsrc = rr.getResourceRequest();
                    if (!"*".equals(reqRsrc.getResourceName())) continue;
                    LOG.info("ResourceRequest:" + reqRsrc);
                    int reqCont = reqRsrc.getNumContainers();
                    long reqMem = reqRsrc.getCapability().getMemorySize();
                    long totalMemoryToRelease = (long)reqCont * reqMem;
                    int reqCores = reqRsrc.getCapability().getVirtualCores();
                    int totalCoresToRelease = reqCont * reqCores;
                    if (pendingPreemptionRam > 0) {
                        pendingPreemptionRam = (int)((long)pendingPreemptionRam - (totalMemoryToRelease -= (long)pendingPreemptionRam));
                    }
                    if (pendingPreemptionCores > 0) {
                        pendingPreemptionCores -= (totalCoresToRelease -= pendingPreemptionCores);
                    }
                    List<Container> listOfCont = ctxt.getContainers(TaskType.REDUCE);
                    Collections.sort(listOfCont, new Comparator<Container>(){

                        @Override
                        public int compare(Container o1, Container o2) {
                            return o2.getId().compareTo(o1.getId());
                        }
                    });
                    for (Container cont : listOfCont) {
                        if (totalMemoryToRelease <= 0L && totalCoresToRelease <= 0) continue block2;
                        TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId());
                        int cMem = (int)cont.getResource().getMemorySize();
                        int cCores = cont.getResource().getVirtualCores();
                        if (!this.toBePreempted.contains(reduceId)) {
                            totalMemoryToRelease -= (long)cMem;
                            totalCoresToRelease -= cCores;
                            this.toBePreempted.add(reduceId);
                            this.pendingFlexiblePreemptions.put(reduceId, cont.getResource());
                        }
                        LOG.info("ResourceRequest:" + reqRsrc + " satisfied preempting " + reduceId);
                    }
                }
            }
        }
    }

    @Override
    public void handleFailedContainer(TaskAttemptId attemptID) {
        this.toBePreempted.remove(attemptID);
        this.checkpoints.remove(attemptID.getTaskId());
    }

    @Override
    public void handleCompletedContainer(TaskAttemptId attemptID) {
        LOG.info(" task completed:" + attemptID);
        this.toBePreempted.remove(attemptID);
        this.pendingFlexiblePreemptions.remove(attemptID);
    }

    @Override
    public boolean isPreempted(TaskAttemptId yarnAttemptID) {
        if (this.toBePreempted.contains(yarnAttemptID)) {
            this.updatePreemptionCounters(yarnAttemptID);
            return true;
        }
        return false;
    }

    @Override
    public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
    }

    @Override
    public TaskCheckpointID getCheckpointID(TaskId taskId) {
        return this.checkpoints.get(taskId);
    }

    @Override
    public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
        this.checkpoints.put(taskId, cid);
        if (cid != null) {
            this.updateCheckpointCounters(taskId, cid);
        }
    }

    private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID cid) {
        JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
        jce.addCounterUpdate((Enum<?>)JobCounter.CHECKPOINTS, 1L);
        this.eventHandler.handle((Event)jce);
        jce = new JobCounterUpdateEvent(taskId.getJobId());
        jce.addCounterUpdate((Enum<?>)JobCounter.CHECKPOINT_BYTES, cid.getCheckpointBytes());
        this.eventHandler.handle((Event)jce);
        jce = new JobCounterUpdateEvent(taskId.getJobId());
        jce.addCounterUpdate((Enum<?>)JobCounter.CHECKPOINT_TIME, cid.getCheckpointTime());
        this.eventHandler.handle((Event)jce);
    }

    private void updatePreemptionCounters(TaskAttemptId yarnAttemptID) {
        if (!this.countedPreemptions.contains(yarnAttemptID)) {
            this.countedPreemptions.add(yarnAttemptID);
            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(yarnAttemptID.getTaskId().getJobId());
            jce.addCounterUpdate((Enum<?>)JobCounter.TASKS_REQ_PREEMPT, 1L);
            this.eventHandler.handle((Event)jce);
        }
    }
}

