/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.queue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.jppf.execute.ExecutorStatus;
import org.jppf.management.JPPFManagementInfo;
import org.jppf.node.policy.Equal;
import org.jppf.node.policy.ExecutionPolicy;
import org.jppf.node.protocol.JobSLA;
import org.jppf.queue.JPPFQueue;
import org.jppf.queue.QueueEvent;
import org.jppf.server.nio.nodeserver.BaseNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncJobScheduler;
import org.jppf.server.protocol.ServerJob;
import org.jppf.server.protocol.ServerJobBroadcast;
import org.jppf.server.protocol.ServerTaskBundleClient;
import org.jppf.server.queue.JPPFPriorityQueue;
import org.jppf.server.queue.RemoveBundleAction;
import org.jppf.server.submission.SubmissionStatus;
import org.jppf.utils.JPPFUuid;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.PropertiesCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BroadcastManager {
    private static final Logger log = LoggerFactory.getLogger(JPPFPriorityQueue.class);
    private static final boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private final ConcurrentHashMap<String, ServerJobBroadcast> pendingBroadcasts = new ConcurrentHashMap();
    private final JPPFPriorityQueue queue;
    private final Lock lock;
    private final AtomicInteger nbWorkingConnections = new AtomicInteger(0);
    private Callable<List<BaseNodeContext>> callableAllConnections = () -> Collections.emptyList();
    private final Map<String, ServerJob> jobMap;

    BroadcastManager(JPPFPriorityQueue queue) {
        this.queue = queue;
        this.lock = queue.getLock();
        this.jobMap = queue.getJobMap();
    }

    void setCallableAllConnections(Callable<List<BaseNodeContext>> callableAllConnections) {
        this.callableAllConnections = callableAllConnections == null ? () -> Collections.emptyList() : callableAllConnections;
    }

    void updateWorkingConnections(ExecutorStatus oldStatus, ExecutorStatus newStatus) {
        boolean bOld;
        boolean bNew = newStatus == ExecutorStatus.ACTIVE || newStatus == ExecutorStatus.EXECUTING;
        boolean bl = bOld = oldStatus == ExecutorStatus.ACTIVE || oldStatus == ExecutorStatus.EXECUTING;
        if (bNew && !bOld) {
            this.nbWorkingConnections.incrementAndGet();
        } else if (!bNew && bOld) {
            this.nbWorkingConnections.decrementAndGet();
        }
    }

    void processBroadcastJob(ServerTaskBundleClient clientBundle) {
        String jobUuid = clientBundle.getUuid();
        ServerJob serverJob = this.jobMap.get(jobUuid);
        if (serverJob == null) {
            ServerJobBroadcast broadcastJob = new ServerJobBroadcast(this.lock, this.queue.jobManager, clientBundle.getJob(), clientBundle.getDataProvider());
            broadcastJob.setSubmissionStatus(SubmissionStatus.PENDING);
            broadcastJob.setQueueEntryTime(System.currentTimeMillis());
            broadcastJob.setJobReceivedTime(broadcastJob.getQueueEntryTime());
            broadcastJob.addOnDone(new RemoveBundleAction(this.queue, broadcastJob));
            this.jobMap.put(jobUuid, broadcastJob);
            broadcastJob.addBundle(clientBundle);
            this.queue.scheduleManager.handleStartJobSchedule(broadcastJob);
            this.queue.scheduleManager.handleExpirationJobSchedule(this.queue.driver, broadcastJob);
            this.queue.jobManager.jobQueued(broadcastJob);
            this.pendingBroadcasts.put(jobUuid, broadcastJob);
        } else {
            serverJob.addBundle(clientBundle);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelBroadcastJobs(String connectionUUID) {
        if (connectionUUID == null || connectionUUID.isEmpty()) {
            return;
        }
        Set<String> jobIDs = Collections.emptySet();
        this.lock.lock();
        try {
            if (this.jobMap.isEmpty()) {
                return;
            }
            jobIDs = new HashSet<String>();
            for (Map.Entry<String, ServerJob> entry : this.jobMap.entrySet()) {
                if (!connectionUUID.equals(entry.getValue().getBroadcastUUID())) continue;
                jobIDs.add(entry.getKey());
            }
        }
        finally {
            this.lock.unlock();
        }
        for (String jobID : jobIDs) {
            this.queue.cancelJob(jobID);
        }
    }

    public void processPendingBroadcasts() {
        List<Object> connections;
        if (this.nbWorkingConnections.get() <= 0) {
            return;
        }
        try {
            connections = this.callableAllConnections.call();
        }
        catch (Throwable e) {
            connections = Collections.emptyList();
        }
        if (connections.isEmpty()) {
            return;
        }
        for (Map.Entry<String, ServerJobBroadcast> entry : this.pendingBroadcasts.entrySet()) {
            ServerJobBroadcast broadcastJob = entry.getValue();
            if (debugEnabled) {
                log.debug("queuing job " + broadcastJob.getJob());
            }
            this.processPendingBroadcast(connections, broadcastJob);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPendingBroadcast(List<BaseNodeContext> connections, ServerJobBroadcast broadcastJob) {
        if (broadcastJob == null) {
            throw new IllegalArgumentException("broadcastJob is null");
        }
        if (this.pendingBroadcasts.remove(broadcastJob.getUuid()) == null) {
            return;
        }
        JobSLA sla = broadcastJob.getSLA();
        ArrayList<ServerJobBroadcast> jobList = new ArrayList<ServerJobBroadcast>(connections.size());
        HashSet<String> uuidSet = new HashSet<String>();
        for (BaseNodeContext connection : connections) {
            String uuid;
            ExecutorStatus status = connection.getExecutionStatus();
            if (status != ExecutorStatus.ACTIVE && status != ExecutorStatus.EXECUTING || (uuid = connection.getUuid()) == null || uuid.length() <= 0 || !uuidSet.add(uuid)) continue;
            JPPFManagementInfo info = connection.getManagementInfo();
            ExecutionPolicy policy = sla.getExecutionPolicy();
            AsyncJobScheduler.preparePolicy(policy, broadcastJob, this.queue.driver.getStatistics(), 0);
            if (policy != null && !policy.evaluate((PropertiesCollection)info.getSystemInfo())) {
                if (!debugEnabled) continue;
                log.debug("node uuid={} refused for broadcast {}", (Object)uuid, (Object)broadcastJob);
                continue;
            }
            Equal broadcastPolicy = new Equal("jppf.uuid", true, uuid);
            if (policy != null) {
                broadcastPolicy = broadcastPolicy.and(policy);
            }
            ServerJobBroadcast newBundle = broadcastJob.createBroadcastJob(uuid);
            newBundle.setSLA(sla.copy());
            newBundle.setMetadata(broadcastJob.getMetadata());
            newBundle.getSLA().setExecutionPolicy((ExecutionPolicy)broadcastPolicy);
            newBundle.setName(broadcastJob.getName() + " [node: " + info.toDisplayString() + ']');
            newBundle.setUuid(JPPFUuid.normalUUID());
            jobList.add(newBundle);
            if (!debugEnabled) continue;
            log.debug("node uuid={} accepted for broadcast: {}, execution policy =\n{}", new Object[]{uuid, newBundle, newBundle.getSLA().getExecutionPolicy()});
        }
        if (jobList.isEmpty()) {
            broadcastJob.jobEnded();
        } else {
            this.lock.lock();
            try {
                this.queue.fireBundleAdded(new QueueEvent((JPPFQueue)this.queue, (Object)broadcastJob, false));
                for (ServerJobBroadcast job : jobList) {
                    this.addBroadcastJob(job);
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private void addBroadcastJob(ServerJobBroadcast broadcastJob) {
        if (broadcastJob == null) {
            throw new IllegalArgumentException("broadcastJob is null");
        }
        String jobUuid = broadcastJob.getUuid();
        broadcastJob.setSubmissionStatus(SubmissionStatus.PENDING);
        broadcastJob.setQueueEntryTime(System.currentTimeMillis());
        broadcastJob.setJobReceivedTime(broadcastJob.getQueueEntryTime());
        broadcastJob.addOnDone(new RemoveBundleAction(this.queue, broadcastJob));
        this.queue.getPriorityMap().putValue((Object)broadcastJob.getSLA().getPriority(), (Object)broadcastJob);
        if (debugEnabled) {
            log.debug("adding bundle with " + broadcastJob);
        }
        this.queue.scheduleManager.handleStartJobSchedule(broadcastJob);
        this.queue.scheduleManager.handleExpirationJobSchedule(this.queue.driver, broadcastJob);
        this.jobMap.put(jobUuid, broadcastJob);
        this.queue.updateLatestMaxSize();
        this.queue.jobManager.jobQueued(broadcastJob);
        this.queue.fireBundleAdded(new QueueEvent((JPPFQueue)this.queue, (Object)broadcastJob, false));
        this.queue.driver.getStatistics().addValue("task.queue.count", (double)broadcastJob.getTaskCount());
    }
}

