/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.AbstractPlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PlacementConstraintProcessor
extends AbstractPlacementProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(PlacementConstraintProcessor.class);
    private ExecutorService schedulingThreadPool;
    private int retryAttempts;
    private Map<ApplicationId, List<BatchedRequests>> requestsToRetry = new ConcurrentHashMap<ApplicationId, List<BatchedRequests>>();
    private Map<ApplicationId, List<SchedulingRequest>> requestsToReject = new ConcurrentHashMap<ApplicationId, List<SchedulingRequest>>();
    private BatchedRequests.IteratorType iteratorType;
    private PlacementDispatcher placementDispatcher;

    @Override
    public void init(ApplicationMasterServiceContext amsContext, ApplicationMasterServiceProcessor nextProcessor) {
        LOG.info("Initializing Constraint Placement Processor:");
        super.init(amsContext, nextProcessor);
        List instances = ((RMContextImpl)amsContext).getYarnConfiguration().getInstances("yarn.resourcemanager.placement-constraints.algorithm.class", ConstraintPlacementAlgorithm.class);
        ConstraintPlacementAlgorithm algorithm = null;
        algorithm = instances != null && !instances.isEmpty() ? (ConstraintPlacementAlgorithm)instances.get(0) : new DefaultPlacementAlgorithm();
        LOG.info("Placement Algorithm [{}]", (Object)algorithm.getClass().getName());
        String iteratorName = ((RMContextImpl)amsContext).getYarnConfiguration().get("yarn.resourcemanager.placement-constraints.algorithm.iterator", BatchedRequests.IteratorType.SERIAL.name());
        LOG.info("Placement Algorithm Iterator[{}]", (Object)iteratorName);
        try {
            this.iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
        }
        catch (IllegalArgumentException e) {
            throw new YarnRuntimeException("Could not instantiate Placement Algorithm Iterator: ", (Throwable)e);
        }
        int algoPSize = ((RMContextImpl)amsContext).getYarnConfiguration().getInt("yarn.resourcemanager.placement-constraints.algorithm.pool-size", 1);
        this.placementDispatcher = new PlacementDispatcher();
        this.placementDispatcher.init((RMContextImpl)amsContext, algorithm, algoPSize);
        LOG.info("Planning Algorithm pool size [{}]", (Object)algoPSize);
        int schedPSize = ((RMContextImpl)amsContext).getYarnConfiguration().getInt("yarn.resourcemanager.placement-constraints.scheduler.pool-size", 1);
        this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
        LOG.info("Scheduler pool size [{}]", (Object)schedPSize);
        this.retryAttempts = ((RMContextImpl)amsContext).getYarnConfiguration().getInt("yarn.resourcemanager.placement-constraints.retry-attempts", 3);
        LOG.info("Num retry attempts [{}]", (Object)this.retryAttempts);
    }

    public void allocate(ApplicationAttemptId appAttemptId, AllocateRequest request, AllocateResponse response) throws YarnException {
        ArrayList<SchedulingRequest> schedulingRequests = new ArrayList<SchedulingRequest>(request.getSchedulingRequests());
        this.dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
        this.reDispatchRetryableRequests(appAttemptId);
        this.schedulePlacedRequests(appAttemptId);
        request.setSchedulingRequests(Collections.emptyList());
        this.nextAMSProcessor.allocate(appAttemptId, request, response);
        this.handleRejectedRequests(appAttemptId, response);
    }

    private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, List<SchedulingRequest> schedulingRequests) {
        if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
            schedulingRequests.forEach(req -> {
                Resource reqResource = req.getResourceSizing().getResources();
                req.getResourceSizing().setResources(this.scheduler.getNormalizedResource(reqResource));
            });
            this.placementDispatcher.dispatch(new BatchedRequests(this.iteratorType, appAttemptId.getApplicationId(), schedulingRequests, 1));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
        List<BatchedRequests> reqsToRetry = this.requestsToRetry.get(appAttId.getApplicationId());
        if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
            List<BatchedRequests> list = reqsToRetry;
            synchronized (list) {
                for (BatchedRequests bReq : reqsToRetry) {
                    this.placementDispatcher.dispatch(bReq);
                }
                reqsToRetry.clear();
            }
        }
    }

    private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
        ApplicationId applicationId = appAttemptId.getApplicationId();
        List<PlacedSchedulingRequest> placedSchedulingRequests = this.placementDispatcher.pullPlacedRequests(applicationId);
        for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
            SchedulingRequest sReq = placedReq.getSchedulingRequest();
            for (SchedulerNode node : placedReq.getNodes()) {
                SchedulingRequest sReqClone = SchedulingRequest.newInstance((long)sReq.getAllocationRequestId(), (Priority)sReq.getPriority(), (ExecutionTypeRequest)sReq.getExecutionType(), (Set)sReq.getAllocationTags(), (ResourceSizing)ResourceSizing.newInstance((Resource)sReq.getResourceSizing().getResources()), (PlacementConstraint)sReq.getPlacementConstraint());
                Object applicationAttempt = this.scheduler.getApplicationAttempt(appAttemptId);
                Runnable task = () -> {
                    boolean success = this.scheduler.attemptAllocationOnNode((SchedulerApplicationAttempt)applicationAttempt, sReqClone, node);
                    if (!success) {
                        LOG.warn("Unsuccessful allocation attempt [{}] for [{}]", (Object)placedReq.getPlacementAttempt(), (Object)sReqClone);
                    }
                    this.handleSchedulingResponse(new Response(success, applicationId, sReqClone, placedReq.getPlacementAttempt(), node));
                };
                this.schedulingThreadPool.submit(task);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRejectedRequests(ApplicationAttemptId appAttemptId, AllocateResponse response) {
        List<SchedulingRequest> rejectedRequests;
        List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests = this.placementDispatcher.pullRejectedRequests(appAttemptId.getApplicationId());
        if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
            LOG.warn("Following requests of [{}] were rejected by the PlacementAlgorithmOutput Algorithm: {}", (Object)appAttemptId.getApplicationId(), rejectedAlgoRequests);
            rejectedAlgoRequests.stream().filter(req -> req.getPlacementAttempt() < this.retryAttempts).forEach(req -> this.handleSchedulingResponse(new Response(false, appAttemptId.getApplicationId(), req.getSchedulingRequest(), req.getPlacementAttempt(), null)));
            ApplicationMasterServiceUtils.addToRejectedSchedulingRequests((AllocateResponse)response, rejectedAlgoRequests.stream().filter(req -> req.getPlacementAttempt() >= this.retryAttempts).map(sr -> RejectedSchedulingRequest.newInstance((RejectionReason)RejectionReason.COULD_NOT_PLACE_ON_NODE, (SchedulingRequest)sr.getSchedulingRequest())).collect(Collectors.toList()));
        }
        if ((rejectedRequests = this.requestsToReject.get(appAttemptId.getApplicationId())) != null && !rejectedRequests.isEmpty()) {
            List<SchedulingRequest> list = rejectedRequests;
            synchronized (list) {
                LOG.warn("Following requests of [{}] exhausted all retry attempts trying to schedule on placed node: {}", (Object)appAttemptId.getApplicationId(), rejectedRequests);
                ApplicationMasterServiceUtils.addToRejectedSchedulingRequests((AllocateResponse)response, rejectedRequests.stream().map(sr -> RejectedSchedulingRequest.newInstance((RejectionReason)RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, (SchedulingRequest)sr)).collect(Collectors.toList()));
                rejectedRequests.clear();
            }
        }
    }

    @Override
    public void finishApplicationMaster(ApplicationAttemptId appAttemptId, FinishApplicationMasterRequest request, FinishApplicationMasterResponse response) {
        this.placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
        this.requestsToReject.remove(appAttemptId.getApplicationId());
        this.requestsToRetry.remove(appAttemptId.getApplicationId());
        super.finishApplicationMaster(appAttemptId, request, response);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
        int placementAttempt = ((Response)schedulerResponse).placementAttempt;
        if (!schedulerResponse.isSuccess() && placementAttempt < this.retryAttempts) {
            List reqsToRetry;
            List list = reqsToRetry = this.requestsToRetry.computeIfAbsent(schedulerResponse.getApplicationId(), k -> new ArrayList());
            synchronized (list) {
                this.addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
            }
            LOG.warn("Going to retry request for application [{}] after [{}] attempts: [{}]", new Object[]{schedulerResponse.getApplicationId(), placementAttempt, schedulerResponse.getSchedulingRequest()});
        } else if (!schedulerResponse.isSuccess()) {
            List reqsToReject;
            LOG.warn("Not retrying request for application [{}] after [{}] attempts: [{}]", new Object[]{schedulerResponse.getApplicationId(), placementAttempt, schedulerResponse.getSchedulingRequest()});
            List list = reqsToReject = this.requestsToReject.computeIfAbsent(schedulerResponse.getApplicationId(), k -> new ArrayList());
            synchronized (list) {
                reqsToReject.add(schedulerResponse.getSchedulingRequest());
            }
        }
    }

    private void addToRetryList(SchedulingResponse schedulerResponse, int placementAttempt, List<BatchedRequests> reqsToRetry) {
        boolean isAdded = false;
        for (BatchedRequests br : reqsToRetry) {
            if (br.getPlacementAttempt() != placementAttempt + 1) continue;
            br.addToBatch(schedulerResponse.getSchedulingRequest());
            br.addToBlacklist(schedulerResponse.getSchedulingRequest().getAllocationTags(), ((Response)schedulerResponse).attemptedNode);
            isAdded = true;
            break;
        }
        if (!isAdded) {
            BatchedRequests br = new BatchedRequests(this.iteratorType, schedulerResponse.getApplicationId(), Lists.newArrayList((Object[])new SchedulingRequest[]{schedulerResponse.getSchedulingRequest()}), placementAttempt + 1);
            reqsToRetry.add(br);
            br.addToBlacklist(schedulerResponse.getSchedulingRequest().getAllocationTags(), ((Response)schedulerResponse).attemptedNode);
        }
    }

    static final class Response
    extends SchedulingResponse {
        private final int placementAttempt;
        private final SchedulerNode attemptedNode;

        private Response(boolean isSuccess, ApplicationId applicationId, SchedulingRequest schedulingRequest, int placementAttempt, SchedulerNode attemptedNode) {
            super(isSuccess, applicationId, schedulingRequest);
            this.placementAttempt = placementAttempt;
            this.attemptedNode = attemptedNode;
        }
    }
}

