/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.nio.nodeserver.async;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import org.jppf.execute.ExecutorStatus;
import org.jppf.load.balancer.Bundler;
import org.jppf.load.balancer.JobAwareness;
import org.jppf.load.balancer.spi.JPPFBundlerFactory;
import org.jppf.management.JPPFManagementInfo;
import org.jppf.management.JPPFSystemInformation;
import org.jppf.node.policy.ExecutionPolicy;
import org.jppf.node.protocol.JPPFDistributedJob;
import org.jppf.node.protocol.JPPFNodeConfigSpec;
import org.jppf.node.protocol.JobDependencySpec;
import org.jppf.node.protocol.JobSLA;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.server.nio.nodeserver.BaseNodeContext;
import org.jppf.server.nio.nodeserver.async.AbstractAsyncJobScheduler;
import org.jppf.server.nio.nodeserver.async.AsyncNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncNodeNioServer;
import org.jppf.server.protocol.ServerJob;
import org.jppf.server.protocol.ServerTask;
import org.jppf.server.protocol.ServerTaskBundleNode;
import org.jppf.server.queue.JPPFPriorityQueue;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.PropertiesCollection;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.TypedPropertiesSimilarityEvaluator;
import org.jppf.utils.collections.SetSortedMap;
import org.jppf.utils.configuration.JPPFProperties;
import org.jppf.utils.stats.JPPFStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncJobScheduler
extends AbstractAsyncJobScheduler {
    private static final Logger log = LoggerFactory.getLogger(AsyncJobScheduler.class);
    private static final boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);

    AsyncJobScheduler(AsyncNodeNioServer server, JPPFPriorityQueue queue, JPPFStatistics stats, JPPFBundlerFactory bundlerFactory) {
        super(server, queue, stats, bundlerFactory);
    }

    @Override
    public void run() {
        if (debugEnabled) {
            log.debug("starting {}", (Object)this.getClass().getSimpleName());
        }
        this.reservationHandler = this.server.getNodeReservationHandler();
        try {
            while (!this.isStopped()) {
                if (this.dispatch()) continue;
                this.goToSleep(1000L);
            }
        }
        catch (Throwable t) {
            log.error("error in driver dispatch loop", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean dispatch() {
        try {
            this.queue.getBroadcastManager().processPendingBroadcasts();
            if (this.queue.isEmpty()) {
                return false;
            }
            BaseNodeContext channel = null;
            ServerTaskBundleNode nodeBundle = null;
            Set set = this.idleChannels;
            synchronized (set) {
                if (this.idleChannels.isEmpty()) {
                    return false;
                }
                List<ServerJob> allJobs = this.queue.getAllJobsFromPriorityMap();
                if (debugEnabled) {
                    log.debug("there are {} idle channels and {} jobs in the queue", (Object)this.idleChannels.size(), (Object)allJobs.size());
                }
                try {
                    Iterator<ServerJob> jobIterator = allJobs.iterator();
                    while (channel == null && jobIterator.hasNext() && !this.idleChannels.isEmpty()) {
                        ServerJob job = jobIterator.next();
                        if (debugEnabled) {
                            log.debug("checking {}", (Object)job);
                        }
                        if (!this.performJobChecks(job) || (channel = this.findIdleChannelIndex(job)) == null) continue;
                        Object object = channel.getMonitor();
                        synchronized (object) {
                            if (job.getSLA().getDesiredNodeConfiguration() != null) {
                                String readyJobUUID = this.reservationHandler.getReadyJobUUID(channel);
                                String pendingJobUUID = this.reservationHandler.getPendingJobUUID(channel);
                                if (pendingJobUUID == null && readyJobUUID == null) {
                                    if (debugEnabled) {
                                        log.debug("reserving {} with {}", (Object)job, (Object)channel);
                                    }
                                    this.reservationHandler.doReservation(job, channel);
                                    channel = null;
                                    continue;
                                }
                            }
                            if (channel.getCurrentNbJobs() >= channel.getMaxJobs()) {
                                this.removeIdleChannel(channel);
                            }
                            if (!channel.isEnabled()) {
                                if (debugEnabled) {
                                    log.debug("channel is disabled [}", (Object)channel);
                                }
                                channel = null;
                                continue;
                            }
                            nodeBundle = this.prepareJobDispatch(channel, job);
                            if (debugEnabled) {
                                log.debug("prepareJobDispatch() returned {}", (Object)nodeBundle);
                            }
                            if (nodeBundle != null) {
                                try {
                                    AsyncJobScheduler.dispatchJobToChannel(channel, nodeBundle);
                                    return true;
                                }
                                catch (Exception e) {
                                    log.error("{}\nchannel={}\njob={}\nstack trace: {}", new Object[]{ExceptionUtils.getMessage((Throwable)e), channel, nodeBundle, ExceptionUtils.getStackTrace((Throwable)e)});
                                    channel.setClosed(false);
                                    channel.handleException(e);
                                }
                            }
                        }
                    }
                    if (debugEnabled) {
                        log.debug(channel == null ? "no channel found for bundle " : "channel found for bundle " + channel);
                    }
                }
                catch (Exception e) {
                    log.error("An error occurred while attempting to dispatch task bundles. This is most likely due to an error in the load balancer implementation.", (Throwable)e);
                }
            }
        }
        catch (Exception e) {
            log.error("An error occurred while preparing for bundle creation and dispatching.", (Throwable)e);
        }
        return false;
    }

    private ServerTaskBundleNode prepareJobDispatch(BaseNodeContext channel, ServerJob selectedJob) {
        if (debugEnabled) {
            log.debug("dispatching jobUuid=" + selectedJob.getUuid() + " to node " + (Object)((Object)channel) + ", nodeUuid=" + channel.getConnectionUuid());
        }
        int size = 1;
        try {
            this.updateBundler(selectedJob.getJob(), channel);
            size = channel.getBundler().getBundleSize();
            if (selectedJob.getSLA().getMaxDispatchSize() < size) {
                size = selectedJob.getSLA().getMaxDispatchSize();
            }
        }
        catch (Exception e) {
            log.error("Error in load balancer implementation, switching to 'manual' with a bundle size of 1", (Throwable)e);
            size = this.bundlerFactory.getFallbackBundler().getBundleSize();
        }
        return this.queue.nextBundle(selectedJob, size, channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void dispatchJobToChannel(BaseNodeContext channel, ServerTaskBundleNode nodeBundle) throws Exception {
        if (debugEnabled) {
            log.debug("dispatching {} tasks of job '{}' to node {}", new Object[]{nodeBundle.getTaskCount(), nodeBundle.getJob().getName(), channel.getUuid()});
        }
        if (log.isTraceEnabled()) {
            TreeSet<Long> set = new TreeSet<Long>();
            for (ServerTask task : nodeBundle.getTaskList()) {
                long id = task.getBundle().getId();
                if (set.contains(id)) continue;
                set.add(id);
            }
            StringBuilder sb = new StringBuilder();
            int count = 0;
            Iterator<Object> iterator = set.iterator();
            while (iterator.hasNext()) {
                long id = (Long)iterator.next();
                if (count > 0) {
                    sb.append(", ");
                }
                sb.append("ServerTaskBundleClient[id=").append(id).append(']');
                ++count;
            }
            log.trace("client bundles in dispatch: {}", (Object)sb);
            sb = new StringBuilder();
            count = 0;
            for (ServerTask task : nodeBundle.getTaskList()) {
                if (count > 0) {
                    sb.append(", ");
                }
                sb.append(task.getPosition());
                ++count;
            }
            log.trace("tasks positions in dispatch: {}", (Object)sb);
        }
        Object object = channel.getMonitor();
        synchronized (object) {
            Future future = channel.submit(nodeBundle);
            nodeBundle.jobDispatched(channel, future);
        }
        if (debugEnabled) {
            log.debug("dispatched {} tasks of job '{}' to node {}", new Object[]{nodeBundle.getTaskCount(), nodeBundle.getJob().getName(), channel.getUuid()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BaseNodeContext findIdleChannelIndex(ServerJob job) {
        JobSLA sla;
        JPPFNodeConfigSpec spec;
        if (debugEnabled) {
            log.debug("checking {}", (Object)job);
        }
        TypedProperties desiredConfiguration = (spec = (sla = job.getSLA()).getDesiredNodeConfiguration()) == null ? null : spec.getConfiguration();
        List<BaseNodeContext> acceptableChannels = new ArrayList<BaseNodeContext>(this.idleChannels.size());
        LinkedList<AsyncNodeContext> toRemove = new LinkedList<AsyncNodeContext>();
        Iterator<Object> nodeIterator = null;
        if (sla.getPreferencePolicy() != null) {
            Set<BaseNodeContext> preferedChannels = this.filterPreferredNodes(job);
            if (preferedChannels.isEmpty()) {
                return null;
            }
            nodeIterator = preferedChannels.iterator();
        } else {
            nodeIterator = this.idleChannels.iterator();
        }
        while (nodeIterator.hasNext()) {
            AsyncNodeContext channel = (AsyncNodeContext)((Object)nodeIterator.next());
            Object object = channel.getMonitor();
            synchronized (object) {
                Collection<String> readyNodes;
                if (channel.getExecutionStatus() != ExecutorStatus.ACTIVE || channel.isClosed() || !channel.isEnabled()) {
                    if (debugEnabled) {
                        log.debug("channel is not opened: {}", (Object)channel);
                    }
                    toRemove.add(channel);
                    continue;
                }
                if (!channel.isActive() || !channel.isAcceptingNewJobs()) {
                    if (debugEnabled) {
                        log.debug("node not accepting jobs: {}", (Object)channel);
                    }
                    continue;
                }
                if (channel.isPeer() && this.server.nodeConnectionHandler.getConnectedRealNodes() >= this.peerLoadBalanceThreshold) {
                    if (debugEnabled) {
                        log.debug("this driver has {} nodes and the threshold is {}", (Object)this.server.nodeConnectionHandler.getConnectedNodes(), (Object)this.peerLoadBalanceThreshold);
                    }
                    continue;
                }
                if (channel.getCurrentNbJobs() >= channel.getMaxJobs()) {
                    if (debugEnabled) {
                        log.debug("[currentNbJobs = {}] >= maxJobs = {}] for {}", new Object[]{channel.getCurrentNbJobs(), channel.getMaxJobs(), channel});
                    }
                    continue;
                }
                if (!this.checkJobAgainstChannel(channel, job)) {
                    continue;
                }
                if (job.getBroadcastUUID() != null && !job.getBroadcastUUID().equals(channel.getUuid())) {
                    continue;
                }
                JPPFSystemInformation info = channel.getSystemInformation();
                if (channel.isPeer() && !this.disptachtoPeersWithoutNode && info != null && info.getJppf().getInt("jppf.peer.total.nodes", 0) <= 0) {
                    if (debugEnabled) {
                        log.debug("peer has no attached node: {}", (Object)channel.getUuid());
                    }
                    continue;
                }
                if (!this.checkExecutionPolicy(channel, job, sla.getExecutionPolicy(), info, job.getNbChannels())) {
                    continue;
                }
                if (!AsyncJobScheduler.checkMaxNodeGroups(channel, job)) {
                    continue;
                }
                Collection<String> collection = readyNodes = spec == null ? null : this.reservationHandler.getReadyNodes(job.getUuid());
                if (debugEnabled) {
                    log.debug("jobUuid={}, readyNodes={}", (Object)job.getUuid(), readyNodes);
                }
                if (!this.checkDesiredConfiguration(desiredConfiguration, channel, job, readyNodes, this.reservationHandler.getNbReservedNodes(job.getUuid()))) {
                    continue;
                }
                if (channel.isLocal() && this.localNodeBiasEnabled) {
                    if (desiredConfiguration != null) {
                        continue;
                    }
                    return channel;
                }
                acceptableChannels.add(channel);
            }
        }
        if (!toRemove.isEmpty()) {
            for (BaseNodeContext baseNodeContext : toRemove) {
                this.removeIdleChannel(baseNodeContext);
            }
        }
        if (!AsyncJobScheduler.checkJobNotCancelled(job)) {
            return null;
        }
        if (!acceptableChannels.isEmpty() && desiredConfiguration != null) {
            acceptableChannels = this.filterLowestDistances(job, acceptableChannels);
        }
        return this.selectChannel(acceptableChannels);
    }

    private BaseNodeContext selectChannel(List<BaseNodeContext> acceptableChannels) {
        if (acceptableChannels.isEmpty()) {
            return null;
        }
        int size = acceptableChannels.size();
        if (debugEnabled) {
            log.debug("found {} acceptable channels", (Object)size);
        }
        BaseNodeContext channel = size > 0 ? acceptableChannels.get(size > 1 ? this.random.nextInt(size) : 0) : null;
        return channel;
    }

    private boolean performJobChecks(ServerJob job) {
        JobDependencySpec dependencySpec = job.getSLA().getDependencySpec();
        if (dependencySpec.getId() != null && !job.isJobGraphAlreadyHandled() && dependencySpec.hasDependency() && this.dependencyHandler.hasPendingDependencyOrCancelled(dependencySpec.getId())) {
            if (debugEnabled) {
                log.debug("job dependency check false for {}", (Object)job);
            }
            return false;
        }
        JPPFNodeConfigSpec spec = job.getSLA().getDesiredNodeConfiguration();
        if (spec != null && this.reservationHandler.getNbReservedNodes(job.getUuid()) >= job.getSLA().getMaxNodes() && !this.reservationHandler.hasReadyNode(job.getUuid())) {
            if (debugEnabled) {
                log.debug("node config check false for {}", (Object)job);
            }
            return false;
        }
        if (job.getTaskGraph() != null && !job.hasAvailableGraphNode()) {
            if (debugEnabled) {
                log.debug("tasks graph check false for {}", (Object)job);
            }
            return false;
        }
        if (!this.checkGridPolicy(job)) {
            return false;
        }
        return AsyncJobScheduler.checkJobState(job);
    }

    private boolean checkJobAgainstChannel(AsyncNodeContext channel, ServerJob job) {
        int nbDispatches;
        String driverUuid;
        int index;
        List uuidPath = job.getJob().getUuidPath().getList();
        if (debugEnabled) {
            log.debug("uuid path={}, node uuid={}", (Object)uuidPath, (Object)channel.getUuid());
        }
        if ((index = uuidPath.indexOf(driverUuid = this.server.getDriver().getUuid())) >= 0 && index != uuidPath.size() - 1) {
            log.warn("uuid path contains this driver's uuid {}: uuidPath={}", (Object)driverUuid, (Object)uuidPath);
        }
        if (uuidPath.contains(channel.getUuid())) {
            if (debugEnabled) {
                log.debug("bundle uuid path already contains node {} : uuidPath={}, nodeUuid={}", new Object[]{channel, uuidPath, channel.getUuid()});
            }
            return false;
        }
        if (channel.isPeer() && uuidPath.size() - 1 >= job.getSLA().getMaxDriverDepth()) {
            if (debugEnabled) {
                log.debug("job [name={}, uuid={}, uuidPath={}] reached max driver depth of {}", new Object[]{job.getName(), job.getUuid(), uuidPath, job.getSLA().getMaxDriverDepth()});
            }
            return false;
        }
        return job.getSLA().isAllowMultipleDispatchesToSameChannel() || (nbDispatches = channel.getNbBundlesForJob(job.getUuid())) <= 0;
    }

    private boolean checkDesiredConfiguration(TypedProperties config, BaseNodeContext channel, ServerJob job, Collection<String> readyNodes, int nbReservedNodes) {
        if (config != null) {
            if (this.reservationHandler.getPendingJobUUID(channel) != null) {
                return false;
            }
            String readyJobUuid = this.reservationHandler.getReadyJobUUID(channel);
            boolean b = true;
            if (readyNodes != null) {
                b = readyNodes.contains(channel.getUuid());
            }
            if (debugEnabled) {
                log.debug("nodeUuid={}, readyJobUuid={}, jobUuid={}, b={}", new Object[]{channel.getUuid(), readyJobUuid, job.getUuid(), b});
            }
            if (!b && nbReservedNodes >= job.getSLA().getMaxNodes()) {
                return false;
            }
        }
        return true;
    }

    private boolean checkExecutionPolicy(BaseNodeContext channel, ServerJob job, ExecutionPolicy policy, JPPFSystemInformation info, int nbJobChannels) {
        if (policy == null) {
            return true;
        }
        if (debugEnabled) {
            log.debug("job has an execution policy: {}\n{}", (Object)job, (Object)policy.toString().trim());
        }
        boolean b = false;
        try {
            AsyncJobScheduler.preparePolicy(policy, job, this.stats, nbJobChannels);
            b = policy.evaluate((PropertiesCollection)info);
        }
        catch (Exception ex) {
            log.error("An error occurred while running the execution policy to determine node participation.", (Throwable)ex);
        }
        if (debugEnabled) {
            log.debug("rule execution is *{}* for job [name={}, uuid={}] on channel {}", new Object[]{b, job.getName(), job.getUuid(), channel});
        }
        return b;
    }

    private static boolean checkJobState(ServerJob job) {
        if (job.isCancelled()) {
            if (debugEnabled) {
                log.debug("job is cancelled: {}", (Object)job);
            }
            return false;
        }
        JobSLA sla = job.getSLA();
        if (debugEnabled) {
            log.debug("job '{}', suspended={}, pending={}, expired={}", new Object[]{job.getName(), sla.isSuspended(), job.isPending(), job.isJobExpired()});
        }
        if (sla.isSuspended() || job.isPending() || job.isJobExpired()) {
            return false;
        }
        if (debugEnabled) {
            log.debug("current nodes = " + job.getNbChannels() + ", maxNodes = " + sla.getMaxNodes());
        }
        return job.getNbChannels() < sla.getMaxNodes();
    }

    private static boolean checkJobNotCancelled(ServerJob job) {
        Lock lock = job.getLock();
        lock.lock();
        try {
            boolean bl = !job.isCancelled();
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    private static boolean checkMaxNodeGroups(BaseNodeContext currentNode, ServerJob job) {
        JPPFManagementInfo currentInfo = currentNode.getManagementInfo();
        if (currentInfo == null) {
            return true;
        }
        String currentMasterUuid = AsyncJobScheduler.getMasterUuid(currentInfo);
        if (currentMasterUuid == null) {
            return true;
        }
        int maxNodeGroups = job.getSLA().getMaxNodeProvisioningGroupss();
        if (maxNodeGroups == Integer.MAX_VALUE || maxNodeGroups <= 0) {
            return true;
        }
        Set<ServerTaskBundleNode> nodes = job.getDispatchSet();
        HashSet<String> masterUuids = new HashSet<String>();
        masterUuids.add(currentMasterUuid);
        for (ServerTaskBundleNode node : nodes) {
            AsyncNodeContext ctx = (AsyncNodeContext)node.getChannel();
            JPPFManagementInfo info = ctx.getManagementInfo();
            String uuid = AsyncJobScheduler.getMasterUuid(info);
            if (uuid == null) continue;
            if (!masterUuids.contains(uuid)) {
                masterUuids.add(uuid);
            }
            if (masterUuids.size() <= maxNodeGroups) continue;
            if (log.isTraceEnabled()) {
                log.trace("[masterUuids.size() = {}] > [maxNodeGroups = {}] for {}", new Object[]{masterUuids.size(), maxNodeGroups, currentNode});
            }
            return false;
        }
        return true;
    }

    private static String getMasterUuid(JPPFManagementInfo info) {
        JPPFSystemInformation systemInfo;
        if (info.isMasterNode()) {
            return info.getUuid();
        }
        if (info.isSlaveNode() && (systemInfo = info.getSystemInfo()) != null) {
            return (String)systemInfo.getJppf().get(JPPFProperties.PROVISIONING_MASTER_UUID);
        }
        return null;
    }

    private void updateBundler(TaskBundle taskBundle, BaseNodeContext context) {
        context.checkBundler(this.bundlerFactory, this.jppfContext);
        Bundler<?> ctxBundler = context.getBundler();
        if (ctxBundler instanceof JobAwareness) {
            ((JobAwareness)ctxBundler).setJob((JPPFDistributedJob)taskBundle);
        }
    }

    private boolean checkGridPolicy(ServerJob job) {
        ExecutionPolicy policy = job.getSLA().getGridExecutionPolicy();
        boolean result = true;
        if (policy != null) {
            AsyncJobScheduler.preparePolicy(policy, job, this.stats, job.getNbChannels());
            result = policy.evaluate((PropertiesCollection)this.driverInfo);
            if (!result && debugEnabled) {
                log.debug("grid policy check false for {}", (Object)job);
            }
        }
        return result;
    }

    private List<BaseNodeContext> filterLowestDistances(ServerJob job, List<BaseNodeContext> channels) {
        JPPFNodeConfigSpec spec = job.getSLA().getDesiredNodeConfiguration();
        TypedProperties desiredConfiguration = spec == null ? null : spec.getConfiguration();
        SetSortedMap scoreMap = new SetSortedMap();
        if (debugEnabled) {
            log.debug("computing scores for job '{}', uuid={}", (Object)job.getName(), (Object)job.getUuid());
        }
        for (BaseNodeContext channel : channels) {
            String reservedJobUuid;
            if (channel.isLocal() || channel.isOffline() || channel.isPeer() || (reservedJobUuid = this.server.getNodeReservationHandler().getPendingJobUUID(channel)) != null && reservedJobUuid.equals(job.getUuid())) continue;
            TypedProperties props = channel.getSystemInformation().getJppf();
            int score = TypedPropertiesSimilarityEvaluator.computeDistance((TypedProperties)desiredConfiguration, (TypedProperties)props);
            channel.setReservationScore(score);
            scoreMap.putValue((Object)score, (Object)channel);
        }
        if (debugEnabled) {
            SetSortedMap map = new SetSortedMap();
            for (Map.Entry entry : scoreMap.entrySet()) {
                for (BaseNodeContext c : (Collection)entry.getValue()) {
                    map.putValue(entry.getKey(), (Object)c.getUuid());
                }
            }
            log.debug("computed scores: {}", (Object)map);
        }
        int n = (Integer)scoreMap.firstKey();
        return scoreMap.isEmpty() ? Collections.emptyList() : new ArrayList(scoreMap.getValues((Object)n));
    }
}

