/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.node.local;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jppf.JPPFUnsupportedOperationException;
import org.jppf.io.DataLocation;
import org.jppf.io.IOHelper;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.JPPFDistributedJob;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.node.protocol.TaskThreadLocals;
import org.jppf.node.protocol.graph.TaskGraphInfo;
import org.jppf.serialization.ObjectSerializer;
import org.jppf.server.nio.nodeserver.LocalNodeMessage;
import org.jppf.server.nio.nodeserver.async.AsyncNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncNodeMessageHandler;
import org.jppf.server.node.AbstractCommonNode;
import org.jppf.server.node.AbstractNodeIO;
import org.jppf.server.node.JPPFContainer;
import org.jppf.server.node.ObjectSerializationTask;
import org.jppf.server.node.local.JPPFLocalContainer;
import org.jppf.server.node.local.JPPFLocalNode;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.concurrent.ThreadSynchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncLocalNodeIO
extends AbstractNodeIO<JPPFLocalNode> {
    private static Logger log = LoggerFactory.getLogger(AsyncLocalNodeIO.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private AsyncNodeContext channel;

    public AsyncLocalNodeIO(JPPFLocalNode node) {
        super((AbstractCommonNode)node);
        this.channel = (AsyncNodeContext)((Object)node.getNodeConnection().getChannel());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Object[] deserializeObjects() throws Exception {
        Object[] result = null;
        LocalNodeMessage currentMessage = null;
        ThreadSynchronization threadSynchronization = this.channel.getLocalNodeReadLock();
        synchronized (threadSynchronization) {
            if (debugEnabled) {
                log.debug("waiting for next request");
            }
            currentMessage = (LocalNodeMessage)this.channel.takeNextMessageToSend();
            if (debugEnabled) {
                log.debug("got request");
            }
        }
        DataLocation location = (DataLocation)currentMessage.getLocations().get(0);
        TaskBundle bundle = (TaskBundle)IOHelper.unwrappedData((DataLocation)location, (ObjectSerializer)((JPPFLocalNode)this.node).getHelper().getSerializer());
        ((JPPFLocalNode)this.node).getExecutionManager().addPendingJobEntry(bundle);
        if (debugEnabled) {
            log.debug("got bundle " + bundle);
        }
        result = this.deserializeObjects(bundle, currentMessage);
        if (debugEnabled) {
            log.debug("got all data");
        }
        return result;
    }

    protected void handleReload() throws Exception {
        ((JPPFLocalNode)this.node).setClassLoader(null);
        ((JPPFLocalNode)this.node).initHelper();
    }

    protected Object[] deserializeObjects(TaskBundle bundle) throws Exception {
        throw new JPPFUnsupportedOperationException("method " + ((Object)((Object)this)).getClass().getName() + ".deserializeObjects(TaskBundle) should never be called for a local node");
    }

    protected Object[] deserializeObjects(TaskBundle bundle, LocalNodeMessage currentMessage) throws Exception {
        TaskGraphInfo graphInfo = (TaskGraphInfo)bundle.getParameter((Object)BundleParameter.JOB_TASK_GRAPH_INFO, null);
        int dependencyCount = graphInfo == null ? 0 : graphInfo.getNbDependencies();
        int count = bundle.getTaskCount() + dependencyCount;
        Object[] list = new Object[count + 2];
        list[0] = bundle;
        try {
            this.initializeBundleData(bundle);
            if (debugEnabled) {
                log.debug("bundle task count = {}, dependencies = {}, handshake = {}", new Object[]{bundle.getTaskCount(), dependencyCount, bundle.isHandshake()});
            }
            if (!bundle.isHandshake()) {
                TaskThreadLocals.setRequestUuid((String)bundle.getUuid());
                boolean clientAccess = (Boolean)bundle.getParameter((Object)BundleParameter.FROM_PERSISTENCE, (Object)false) == false;
                JPPFLocalContainer cont = (JPPFLocalContainer)((JPPFLocalNode)this.node).getClassLoaderManager().getContainer(bundle.getUuidPath().getList(), clientAccess, (Object[])null);
                if (!((JPPFLocalNode)this.node).isOffline() && !bundle.getSLA().isRemoteClassLoadingEnabled()) {
                    cont.getClassLoader().setRemoteClassLoadingDisabled(true);
                }
                ((JPPFLocalNode)this.node).getLifeCycleEventHandler().fireJobHeaderLoaded((JPPFDistributedJob)bundle, cont.getClassLoader());
                cont.deserializeObjects(list, 1 + count, currentMessage, ((JPPFLocalNode)this.node).getSerializationExecutor());
            }
            if (debugEnabled) {
                log.debug("got all data");
            }
        }
        catch (Throwable t) {
            log.error("Exception occurred while deserializing the tasks", t);
            bundle.setTaskCount(0);
            bundle.setParameter((Object)BundleParameter.NODE_EXCEPTION_PARAM, (Object)t);
        }
        return list;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendResults(TaskBundle bundle, List<Task<?>> tasks) throws Exception {
        if (debugEnabled) {
            log.debug("writing {} results for {}", (Object)tasks.size(), (Object)bundle);
        }
        ExecutorService executor = ((JPPFLocalNode)this.node).getSerializationExecutor();
        this.finalizeBundleData(bundle, tasks);
        ArrayList futureList = new ArrayList(tasks == null ? 1 : tasks.size() + 1);
        JPPFContainer cont = ((JPPFLocalNode)this.node).getContainer(bundle.isNotification() ? ((JPPFLocalNode)this.node).getHandshakeUuidPath() : bundle.getUuidPath().getList());
        int submitCount = 0;
        futureList.add(executor.submit(new ObjectSerializationTask((Object)bundle, cont, bundle, submitCount++)));
        if (!bundle.isNotification()) {
            for (Task<?> task : tasks) {
                futureList.add(executor.submit(new ObjectSerializationTask(task, cont, bundle, submitCount++)));
            }
        }
        LocalNodeMessage message = (LocalNodeMessage)this.channel.newMessage();
        for (Future future : futureList) {
            DataLocation location = (DataLocation)future.get();
            message.addLocation(location);
        }
        message.setBundle(bundle);
        ThreadSynchronization threadSynchronization = this.channel.getLocalNodeWriteLock();
        synchronized (threadSynchronization) {
            if (debugEnabled) {
                log.debug("wrote full results");
            }
            AsyncNodeMessageHandler asyncNodeMessageHandler = this.channel.getServer().getMessageHandler();
            if (bundle.isHandshake()) {
                asyncNodeMessageHandler.handshakeReceived(this.channel, message);
            } else {
                asyncNodeMessageHandler.resultsReceived(this.channel, message);
            }
        }
    }
}

