/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.node.remote;

import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jppf.JPPFSuspendedNodeException;
import org.jppf.comm.socket.SocketWrapper;
import org.jppf.io.DataLocation;
import org.jppf.io.IOHelper;
import org.jppf.io.OutputDestination;
import org.jppf.io.SocketWrapperOutputDestination;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.JPPFDistributedJob;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.node.protocol.TaskThreadLocals;
import org.jppf.node.protocol.graph.TaskGraphInfo;
import org.jppf.serialization.ObjectSerializer;
import org.jppf.server.node.AbstractCommonNode;
import org.jppf.server.node.AbstractNodeIO;
import org.jppf.server.node.JPPFContainer;
import org.jppf.server.node.ObjectSerializationTask;
import org.jppf.server.node.remote.AbstractRemoteNode;
import org.jppf.server.node.remote.JPPFRemoteContainer;
import org.jppf.server.node.remote.RemoteNodeConnection;
import org.jppf.utils.LoggingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteNodeIO
extends AbstractNodeIO<AbstractRemoteNode> {
    private static Logger log = LoggerFactory.getLogger(RemoteNodeIO.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private static boolean traceEnabled = log.isTraceEnabled();

    public RemoteNodeIO(AbstractRemoteNode node) {
        super((AbstractCommonNode)node);
    }

    protected Object[] deserializeObjects() throws Exception {
        ObjectSerializer ser = ((AbstractRemoteNode)this.node).getHelper().getSerializer();
        if (debugEnabled) {
            log.debug("waiting for next request. Serializer = " + ser + " (class loader = " + ser.getClass().getClassLoader() + ")");
        }
        Object[] result = null;
        TaskBundle bundle = (TaskBundle)IOHelper.unwrappedData((SocketWrapper)this.getSocketWrapper(), (ObjectSerializer)ser);
        ((AbstractRemoteNode)this.node).getExecutionManager().addPendingJobEntry(bundle);
        ((AbstractRemoteNode)this.node).setExecuting(true);
        if (debugEnabled) {
            log.debug("got bundle " + bundle);
        }
        result = this.deserializeObjects(bundle);
        if (((AbstractRemoteNode)this.node).isOffline() && !bundle.isHandshake()) {
            if (debugEnabled) {
                log.debug("waiting for channel closed");
            }
            this.waitChannelClosed(this.getSocketWrapper());
            if (debugEnabled) {
                log.debug("channel closed");
            }
        }
        return result;
    }

    protected Object[] deserializeObjects(TaskBundle bundle) throws Exception {
        TaskGraphInfo graphInfo = (TaskGraphInfo)bundle.getParameter((Object)BundleParameter.JOB_TASK_GRAPH_INFO, null);
        int dependencyCount = graphInfo == null ? 0 : graphInfo.getNbDependencies();
        int count = bundle.getTaskCount() + dependencyCount;
        Object[] list = new Object[count + 2];
        list[0] = bundle;
        try {
            this.initializeBundleData(bundle);
            if (debugEnabled) {
                log.debug("bundle task count = {}, dependencies = {}, handshake = {}", new Object[]{bundle.getTaskCount(), dependencyCount, bundle.isHandshake()});
            }
            if (!bundle.isHandshake()) {
                TaskThreadLocals.setRequestUuid((String)bundle.getUuid());
                boolean clientAccess = (Boolean)bundle.getParameter((Object)BundleParameter.FROM_PERSISTENCE, (Object)false) == false;
                JPPFRemoteContainer cont = (JPPFRemoteContainer)((AbstractRemoteNode)this.node).getClassLoaderManager().getContainer(bundle.getUuidPath().getList(), clientAccess, (Object[])null);
                cont.setNodeConnection((RemoteNodeConnection)((AbstractRemoteNode)this.node).getNodeConnection());
                if (!((AbstractRemoteNode)this.node).isOffline() && !bundle.getSLA().isRemoteClassLoadingEnabled()) {
                    cont.getClassLoader().setRemoteClassLoadingDisabled(true);
                }
                ((AbstractRemoteNode)this.node).getLifeCycleEventHandler().fireJobHeaderLoaded((JPPFDistributedJob)bundle, cont.getClassLoader());
                cont.deserializeObjects(list, 1 + count, ((AbstractRemoteNode)this.node).getSerializationExecutor());
            } else {
                this.getSocketWrapper().receiveBytes(0);
            }
            if (debugEnabled) {
                log.debug("got all data");
            }
        }
        catch (Throwable t) {
            log.error("Exception occurred while deserializing the tasks", t);
            bundle.setTaskCount(0);
            bundle.setParameter((Object)BundleParameter.NODE_EXCEPTION_PARAM, (Object)t);
        }
        return list;
    }

    protected void handleReload() throws Exception {
        ((AbstractRemoteNode)this.node).setClassLoader(null);
        ((AbstractRemoteNode)this.node).initHelper();
        this.getSocketWrapper().setSerializer(((AbstractRemoteNode)this.node).getHelper().getSerializer());
    }

    protected void sendResults(TaskBundle bundle, List<Task<?>> tasks) throws Exception {
        SocketWrapper socketWrapper;
        if (debugEnabled) {
            log.debug("writing results for " + bundle);
        }
        if ((socketWrapper = this.getSocketWrapper()) == null) {
            throw new SocketException("no connection to the server");
        }
        ExecutorService executor = ((AbstractRemoteNode)this.node).getSerializationExecutor();
        this.finalizeBundleData(bundle, tasks);
        ArrayList futureList = new ArrayList(tasks == null ? 1 : tasks.size() + 1);
        JPPFContainer cont = ((AbstractRemoteNode)this.node).getContainer(bundle.isNotification() ? ((AbstractRemoteNode)this.node).getHandshakeUuidPath() : bundle.getUuidPath().getList());
        int submitCount = 0;
        futureList.add(executor.submit(new ObjectSerializationTask((Object)bundle, cont, bundle, submitCount++)));
        if (tasks != null) {
            for (Task<?> task : tasks) {
                futureList.add(executor.submit(new ObjectSerializationTask(task, cont, bundle, submitCount++)));
            }
        }
        SocketWrapperOutputDestination dest = new SocketWrapperOutputDestination(socketWrapper);
        int count = 0;
        for (Future future : futureList) {
            DataLocation dl = (DataLocation)future.get();
            if (traceEnabled) {
                log.trace("writing " + (count == 0 ? "header" : "task[" + count + ']') + " with size = " + dl.getSize());
            }
            IOHelper.writeData((DataLocation)dl, (OutputDestination)dest);
            ++count;
        }
        socketWrapper.flush();
        if (debugEnabled) {
            log.debug("wrote full results");
        }
    }

    private void waitChannelClosed(SocketWrapper socketWrapper) {
        block9: {
            block8: {
                try {
                    socketWrapper.readInt();
                }
                catch (Exception exception) {
                }
                catch (Error e) {
                    if (!debugEnabled) break block8;
                    log.debug("error closing socket: ", (Throwable)e);
                }
            }
            if (traceEnabled) {
                log.trace("server closed the connection");
            }
            try {
                ((AbstractRemoteNode)this.node).closeDataChannel();
            }
            catch (Exception e) {
            }
            catch (Error e) {
                if (!debugEnabled) break block9;
                log.debug("error closing data channel: ", (Throwable)e);
            }
        }
        if (debugEnabled) {
            log.debug("closed the data channel");
        }
    }

    private SocketWrapper getSocketWrapper() {
        SocketWrapper socketWrapper = (SocketWrapper)((RemoteNodeConnection)((AbstractRemoteNode)this.node).getNodeConnection()).getChannel();
        if (socketWrapper == null && ((AbstractRemoteNode)this.node).isSuspended()) {
            throw new JPPFSuspendedNodeException("node connection was closed by another thread");
        }
        return socketWrapper;
    }
}

