/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.nio.client;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.jppf.io.DataLocation;
import org.jppf.io.IOHelper;
import org.jppf.management.JMXServer;
import org.jppf.management.JPPFSystemInformation;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.JPPFTaskBundle;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.server.JPPFDriver;
import org.jppf.server.nio.classloader.client.AsyncClientClassContext;
import org.jppf.server.nio.classloader.client.AsyncClientClassNioServer;
import org.jppf.server.nio.client.AsyncClientContext;
import org.jppf.server.nio.client.ClientMessage;
import org.jppf.server.nio.client.CompletionListener;
import org.jppf.server.nio.client.JobEntry;
import org.jppf.server.nio.nodeserver.PeerAttributesHandler;
import org.jppf.server.protocol.ServerTaskBundleClient;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.JPPFUuid;
import org.jppf.utils.TraversalList;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.configuration.JPPFProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncClientMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(AsyncClientMessageHandler.class);
    private static final boolean debugEnabled = log.isDebugEnabled();
    private final JPPFDriver driver;
    private final boolean jppfDebugEnabled;

    public AsyncClientMessageHandler(JPPFDriver driver) {
        this.driver = driver;
        this.jppfDebugEnabled = driver.isJppfDebugEnabled();
    }

    void jobReceived(AsyncClientContext context, ClientMessage message) throws Exception {
        ServerTaskBundleClient clientBundle;
        TaskBundle header;
        boolean closeCommand;
        if (debugEnabled) {
            log.debug("received job {} for {}", (Object)message, (Object)context);
        }
        if (closeCommand = ((Boolean)(header = (clientBundle = context.deserializeBundle(message)).getJob()).getParameter((Object)BundleParameter.CLOSE_COMMAND, (Object)false)).booleanValue()) {
            if (debugEnabled) {
                log.debug("processing CLOSE_COMMAND for {}", (Object)context);
            }
            context.handleException(null);
            return;
        }
        int count = header.getTaskCount();
        if (debugEnabled) {
            log.debug("read bundle {} from client {} done: received {} tasks", new Object[]{clientBundle, context, count});
        }
        if (clientBundle.getJobReceivedTime() == 0L) {
            clientBundle.setJobReceivedTime(System.currentTimeMillis());
        }
        header.getUuidPath().incPosition();
        header.getUuidPath().add((Object)this.driver.getUuid());
        if (debugEnabled) {
            log.debug("uuid path=" + header.getUuidPath());
        }
        clientBundle.addCompletionListener(new CompletionListener(context));
        clientBundle.handleNullTasks();
        if (clientBundle.isDone()) {
            if (debugEnabled) {
                log.debug("client bundle done: {}", (Object)clientBundle);
            }
            clientBundle.bundleEnded();
        } else {
            context.addEntry(clientBundle);
            this.driver.getQueue().addBundle(clientBundle);
        }
    }

    void sendJobResults(AsyncClientContext context, ServerTaskBundleClient bundle) throws Exception {
        if (log.isTraceEnabled()) {
            log.trace("sending job results with originalId={}, bundle={} for {}\ntasks positions = {}", new Object[]{bundle.getOriginalBundleId(), bundle, context, Arrays.toString(bundle.getTasksPositions())});
        } else if (debugEnabled) {
            log.debug("sending job results with originalId={}, bundle={} for {}", new Object[]{bundle.getOriginalBundleId(), bundle, context});
        }
        ClientMessage message = context.serializeBundle(bundle);
        context.offerMessageToSend(bundle, message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void jobResultsSent(AsyncClientContext context, ServerTaskBundleClient bundle) throws Exception {
        JobEntry entry;
        long bundleId = bundle.getOriginalBundleId();
        if (debugEnabled) {
            log.debug("job results sent bundleId={}, bundle={} for {}", new Object[]{bundleId, bundle, context});
        }
        if ((entry = context.getJobEntry(bundle.getUuid(), bundleId)) != null) {
            boolean jobEnded = false;
            JobEntry jobEntry = entry;
            synchronized (jobEntry) {
                entry.nbTasksToSend -= bundle.getTaskCount();
                if (debugEnabled) {
                    log.debug("job entry = {}", (Object)entry);
                }
                if (entry.nbTasksToSend <= 0) {
                    if (debugEnabled) {
                        log.debug("*** client bundle ended {}", (Object)entry);
                    }
                    jobEnded = true;
                    context.removeJobEntry(bundle.getUuid(), bundleId);
                }
            }
            if (jobEnded) {
                entry.jobEnded();
            }
        } else if (log.isTraceEnabled()) {
            log.trace("job entry not found for uuid={}, bundleId={}, call stack:\n{}", new Object[]{bundle.getUuid(), bundleId, ExceptionUtils.getCallStack()});
        } else if (this.jppfDebugEnabled) {
            AsyncClientContext.getEntrystats().notFound();
            log.warn("job entry not found for [uuid={}, bundleId={}] in {}\ncall stack:\n{}", new Object[]{bundle.getUuid(), bundleId, this, ExceptionUtils.getCallStack()});
        } else {
            log.warn("job entry not found for uuid={}, bundleId={}", (Object)bundle.getUuid(), (Object)bundleId);
        }
    }

    void handshakeReceived(AsyncClientContext context, ClientMessage message) throws Exception {
        ServerTaskBundleClient bundle = context.deserializeBundle(message);
        TaskBundle header = bundle.getJob();
        if (debugEnabled) {
            log.debug("read handshake bundle {} from client {}", (Object)header, (Object)context);
        }
        context.setConnectionUuid((String)header.getParameter((Object)BundleParameter.CONNECTION_UUID));
        header.getUuidPath().incPosition();
        String uuid = (String)header.getUuidPath().getCurrentElement();
        context.setUuid(uuid);
        this.awaitClassProvider(uuid);
        header.getUuidPath().add((Object)this.driver.getUuid());
        if (debugEnabled) {
            log.debug("uuid path=" + header.getUuidPath());
        }
        header.clear();
        header.setParameter((Object)BundleParameter.SYSTEM_INFO_PARAM, (Object)this.driver.getSystemInformation());
        header.setParameter((Object)BundleParameter.DRIVER_UUID_PARAM, (Object)this.driver.getUuid());
        JMXServer jmxServer = this.driver.getInitializer().getJmxServer(false);
        header.setParameter((Object)BundleParameter.DRIVER_MANAGEMENT_PORT, (Object)(jmxServer != null ? jmxServer.getManagementPort() : -1));
        jmxServer = this.driver.getInitializer().getJmxServer(true);
        header.setParameter((Object)BundleParameter.DRIVER_MANAGEMENT_PORT_SSL, (Object)(jmxServer != null ? jmxServer.getManagementPort() : -1));
        ClientMessage response = context.serializeBundle(bundle);
        context.offerMessageToSend(bundle, response);
    }

    public void sendPeerHandshake(AsyncClientContext context) throws Exception {
        JPPFTaskBundle header = new JPPFTaskBundle();
        TraversalList uuidPath = new TraversalList();
        uuidPath.add((Object)this.driver.getUuid());
        header.setUuidPath(uuidPath);
        if (debugEnabled) {
            log.debug("sending handshake job, uuidPath={}", (Object)uuidPath);
        }
        header.setUuid(JPPFUuid.normalUUID());
        header.setName("handshake job");
        header.setHandshake(true);
        header.setUuid(header.getName());
        header.setParameter((Object)BundleParameter.CONNECTION_UUID, (Object)context.getConnectionUuid());
        header.setParameter((Object)BundleParameter.IS_PEER, (Object)true);
        header.setParameter((Object)BundleParameter.NODE_UUID_PARAM, (Object)this.driver.getUuid());
        TypedProperties config = this.driver.getConfiguration();
        if (config.containsProperty(JPPFProperties.NODE_MAX_JOBS)) {
            header.setParameter((Object)BundleParameter.NODE_MAX_JOBS, config.get(JPPFProperties.NODE_MAX_JOBS));
        }
        JMXServer jmxServer = this.driver.getInitializer().getJmxServer(context.isSecure());
        header.setParameter((Object)BundleParameter.NODE_MANAGEMENT_PORT_PARAM, (Object)jmxServer.getManagementPort());
        PeerAttributesHandler peerHandler = this.driver.getAsyncNodeNioServer().getPeerHandler();
        JPPFSystemInformation systemInformation = this.driver.getSystemInformation();
        systemInformation.getJppf().setInt(PeerAttributesHandler.PEER_TOTAL_THREADS, peerHandler.getTotalThreads());
        systemInformation.getJppf().setInt("jppf.peer.total.nodes", peerHandler.getTotalNodes());
        header.setParameter((Object)BundleParameter.SYSTEM_INFO_PARAM, (Object)systemInformation);
        header.setSLA(null);
        header.setMetadata(null);
        DataLocation dataProvider = IOHelper.serializeData(null);
        ServerTaskBundleClient bundle = new ServerTaskBundleClient((TaskBundle)header, dataProvider, Collections.emptyList(), true);
        ClientMessage request = context.serializeBundle(bundle);
        context.offerMessageToSend(bundle, request);
    }

    void peerHandshakeResponseReceived(AsyncClientContext context, ClientMessage message) throws Exception {
        ServerTaskBundleClient bundleWrapper = context.deserializeBundle(message);
        TaskBundle header = bundleWrapper.getJob();
        if (debugEnabled) {
            log.debug("read peer handshake bundle {} from client {}", (Object)header, (Object)context);
        }
        header.getUuidPath().incPosition();
        String uuid = (String)header.getUuidPath().getCurrentElement();
        context.setUuid(uuid);
        this.awaitClassProvider(uuid);
        String driverUUID = this.driver.getUuid();
        header.getUuidPath().add((Object)driverUUID);
        if (debugEnabled) {
            log.debug("uuid path=" + header.getUuidPath());
        }
        header.clear();
    }

    private void awaitClassProvider(String uuid) throws Exception {
        AsyncClientClassNioServer classServer = this.driver.getAsyncClientClassServer();
        List<AsyncClientClassContext> list = classServer.getProviderConnections(uuid);
        while (list == null || list.isEmpty()) {
            Thread.sleep(1L);
            list = classServer.getProviderConnections(uuid);
        }
    }
}

