/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.nio.nodeserver.async;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.List;
import org.jppf.execute.ExecutorStatus;
import org.jppf.job.JobReturnReason;
import org.jppf.load.balancer.Bundler;
import org.jppf.load.balancer.BundlerEx;
import org.jppf.load.balancer.BundlerHelper;
import org.jppf.load.balancer.ChannelAwareness;
import org.jppf.load.balancer.spi.JPPFBundlerFactory;
import org.jppf.management.JMXServer;
import org.jppf.management.JPPFManagementInfo;
import org.jppf.management.JPPFSystemInformation;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.JobSLA;
import org.jppf.node.protocol.NotificationBundle;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.scheduling.JPPFSchedule;
import org.jppf.server.DriverInitializer;
import org.jppf.server.JPPFDriver;
import org.jppf.server.nio.AbstractTaskBundleMessage;
import org.jppf.server.nio.nodeserver.BaseNodeContext;
import org.jppf.server.nio.nodeserver.NodeBundleResults;
import org.jppf.server.nio.nodeserver.NodeDispatchTimeoutAction;
import org.jppf.server.nio.nodeserver.async.AsyncNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncNodeNioServer;
import org.jppf.server.protocol.ServerJob;
import org.jppf.server.protocol.ServerTask;
import org.jppf.server.protocol.ServerTaskBundleNode;
import org.jppf.utils.CryptoUtils;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.HostIP;
import org.jppf.utils.NetworkUtils;
import org.jppf.utils.Pair;
import org.jppf.utils.StringUtils;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.configuration.JPPFProperties;
import org.jppf.utils.stats.JPPFStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncNodeMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(AsyncNodeMessageHandler.class);
    private static final boolean debugEnabled = log.isDebugEnabled();
    private final JPPFDriver driver;
    protected final boolean resolveIPs;

    public AsyncNodeMessageHandler(JPPFDriver driver) {
        this.driver = driver;
        this.resolveIPs = (Boolean)driver.getConfiguration().get(JPPFProperties.RESOLVE_ADDRESSES);
    }

    public void sendHandshakeBundle(AsyncNodeContext context, ServerTaskBundleNode bundle) throws Exception {
        AbstractTaskBundleMessage msg = context.serializeBundle(bundle);
        context.offerMessageToSend(bundle, msg);
    }

    void beforeSendingBundle(AsyncNodeContext context, ServerTaskBundleNode nodeBundle) throws Exception {
        if (nodeBundle != null) {
            JobSLA sla = nodeBundle.getJob().getSLA();
            JPPFSchedule schedule = sla.getDispatchExpirationSchedule();
            if (schedule != null) {
                AsyncNodeNioServer server = context.getServer();
                NodeDispatchTimeoutAction action = new NodeDispatchTimeoutAction(server.getOfflineNodeHandler(), nodeBundle, context.isOffline() ? null : context);
                server.getDispatchExpirationHandler().scheduleAction((Object)ServerTaskBundleNode.makeKey(nodeBundle), schedule, (Runnable)action);
            }
        } else {
            log.warn("null node bundle, a dispatch timeout chedule may be lost for context = {}", (Object)context);
        }
    }

    void bundleSent(AsyncNodeContext context, ServerTaskBundleNode nodeBundle) throws Exception {
        if (context.isOffline()) {
            AsyncNodeMessageHandler.processOfflineRequest(context, nodeBundle);
        }
    }

    public void handshakeReceived(AsyncNodeContext context, AbstractTaskBundleMessage message) throws Exception {
        int type;
        TaskBundle bundle;
        boolean offline;
        NodeBundleResults received = context.deserializeBundle(message);
        if (debugEnabled) {
            log.debug("received handshake response for channel {} : {}", (Object)context, (Object)received);
        }
        if (offline = ((Boolean)(bundle = received.bundle()).getParameter((Object)BundleParameter.NODE_OFFLINE, (Object)false)).booleanValue()) {
            context.setOffline(true);
            context.setMaxJobs(1);
        } else {
            this.updateMaxJobs(context, bundle);
            Boolean acceptsNewJobs = (Boolean)bundle.getParameter((Object)BundleParameter.NODE_ACCEPTS_NEW_JOBS);
            if (acceptsNewJobs != null) {
                context.setAcceptingNewJobs(acceptsNewJobs);
            }
            if (!bundle.isHandshake()) {
                throw new IllegalStateException("handshake bundle expected.");
            }
        }
        if (debugEnabled) {
            log.debug("read bundle for {}, bundle={}", (Object)context, (Object)bundle);
        }
        String uuid = (String)bundle.getParameter((Object)BundleParameter.NODE_UUID_PARAM);
        context.setUuid(uuid);
        JPPFSystemInformation systemInfo = (JPPFSystemInformation)bundle.getParameter((Object)BundleParameter.SYSTEM_INFO_PARAM);
        context.setNodeIdentifier(AsyncNodeMessageHandler.getNodeIdentifier(context.getServer().getBundlerFactory(), context, systemInfo));
        if (debugEnabled) {
            log.debug("nodeID = {} for node = {}", context.getNodeIdentifier(), (Object)context);
        }
        boolean isPeer = (Boolean)bundle.getParameter((Object)BundleParameter.IS_PEER, (Object)false);
        context.setPeer(isPeer);
        if (systemInfo != null) {
            systemInfo.getJppf().setBoolean("jppf.peer.driver", isPeer);
            systemInfo.getJppf().set(JPPFProperties.NODE_IDLE, (Object)true);
            context.setNodeInfo(systemInfo, false);
            if (log.isTraceEnabled()) {
                log.trace("node network info:\nipv4: {}\nipv6: {}", (Object)systemInfo.getNetwork().getString("ipv4.addresses"), (Object)systemInfo.getNetwork().getString("ipv6.addresses"));
            }
        } else if (debugEnabled) {
            log.debug("no system info received for node {}", (Object)context);
        }
        int port = (Integer)bundle.getParameter((Object)BundleParameter.NODE_MANAGEMENT_PORT_PARAM, (Object)-1);
        if (debugEnabled) {
            log.debug("management port = {} for node = {}", (Object)port, (Object)context);
        }
        String host = (String)bundle.getParameter((Object)BundleParameter.NODE_MANAGEMENT_HOST_PARAM, null);
        HostIP hostIP = null;
        if (host == null) {
            host = AsyncNodeMessageHandler.getChannelHost(context);
            hostIP = context.isLocal() ? new HostIP(host, host) : this.resolveHost(context);
        } else {
            hostIP = NetworkUtils.getHostIP((String)host);
        }
        boolean sslEnabled = !context.isLocal() && context.getSSLHandler() != null;
        boolean hasJmx = (Boolean)this.driver.getConfiguration().get(JPPFProperties.MANAGEMENT_ENABLED);
        String masterUuid = (String)bundle.getParameter((Object)BundleParameter.NODE_PROVISIONING_MASTER_UUID);
        int n = type = isPeer ? 2 : 1;
        if (hasJmx && uuid != null && !offline && port >= 0) {
            if (context.isLocal()) {
                type |= 0x40000;
                DriverInitializer initializer = this.driver.getInitializer();
                JMXServer jmxServer = initializer.getJmxServer(sslEnabled);
                if (jmxServer != null) {
                    host = jmxServer.getManagementHost();
                }
            }
            if (((Boolean)bundle.getParameter((Object)BundleParameter.NODE_PROVISIONING_MASTER, (Object)false)).booleanValue()) {
                type |= 0x10000;
            } else if (((Boolean)bundle.getParameter((Object)BundleParameter.NODE_PROVISIONING_SLAVE, (Object)false)).booleanValue()) {
                type |= 0x20000;
            }
            if (((Boolean)bundle.getParameter((Object)BundleParameter.NODE_DOTNET_CAPABLE, (Object)false)).booleanValue()) {
                type |= 0x80000;
            }
            if (systemInfo != null && ((Boolean)systemInfo.getJppf().get(JPPFProperties.NODE_ANDROID)).booleanValue()) {
                type |= 0x100000;
            }
            JPPFManagementInfo info = new JPPFManagementInfo(hostIP.hostName(), hostIP.ipAddress(), port, uuid, type, sslEnabled, masterUuid);
            if (debugEnabled) {
                log.debug("configuring management for hostIP={} : {}", (Object)hostIP, (Object)info);
            }
            if (systemInfo != null) {
                info.setSystemInfo(systemInfo);
                if (debugEnabled) {
                    log.debug("node has following configuration:\n{}", (Object)systemInfo.getJppf());
                }
            }
            context.setManagementInfo(info);
        } else {
            if (offline || port < 0) {
                JPPFManagementInfo info = new JPPFManagementInfo(hostIP.hostName(), hostIP.ipAddress(), -1, context.getUuid(), type, sslEnabled, masterUuid);
                if (systemInfo != null) {
                    info.setSystemInfo(systemInfo);
                }
                context.setManagementInfo(info);
            }
            context.getServer().nodeConnected(context);
        }
        context.getServer().putConnection(context);
        if (((Boolean)bundle.getParameter((Object)BundleParameter.NODE_OFFLINE_OPEN_REQUEST, (Object)false)).booleanValue()) {
            this.processOfflineReopen(received, context);
        }
    }

    public void resultsReceived(AsyncNodeContext context, AbstractTaskBundleMessage message) throws Exception {
        if (debugEnabled) {
            log.debug("node {} received {}", (Object)context, (Object)message);
        }
        NodeBundleResults received = context.deserializeBundle(message);
        this.process(received, context);
    }

    private HostIP resolveHost(AsyncNodeContext context) throws Exception {
        HostIP hostIP;
        JPPFSystemInformation info;
        String host;
        String ip = host = AsyncNodeMessageHandler.getChannelHost(context);
        boolean resolveFromSysInfo = this.driver.getConfiguration().getBoolean("jppf.resolve.node.host.from.sysinfo", false);
        if (!resolveFromSysInfo) {
            try {
                InetAddress addr = InetAddress.getByName(host);
                ip = addr.getHostAddress();
                if (host.equals(ip)) {
                    host = addr.getHostName();
                }
                if (!host.equals(ip)) {
                    if (log.isTraceEnabled()) {
                        log.trace("resolved host from reverse DNS lookup: host={}, ip={}", (Object)host, (Object)ip);
                    }
                    return new HostIP(host, ip);
                }
            }
            catch (UnknownHostException addr) {
                // empty catch block
            }
        }
        if ((info = context.getSystemInformation()) != null) {
            for (HostIP hostIP2 : info.parseIPV4Addresses()) {
                if (!resolveFromSysInfo && !host.equals(hostIP2.hostName()) && !host.equals(hostIP2.ipAddress())) continue;
                if (log.isTraceEnabled()) {
                    log.trace("resolved host from system info: {}", (Object)hostIP2);
                }
                return hostIP2;
            }
            for (HostIP hostIP2 : info.parseIPV6Addresses()) {
                if (!resolveFromSysInfo && !host.equals(hostIP2.hostName()) && !host.equals(hostIP2.ipAddress())) continue;
                if (log.isTraceEnabled()) {
                    log.trace("resolved host from system info: {}", (Object)hostIP2);
                }
                return hostIP2;
            }
        }
        HostIP hostIP3 = hostIP = this.resolveIPs ? NetworkUtils.getHostIP((String)host) : new HostIP(host, host);
        if (log.isTraceEnabled()) {
            log.trace("{}: {}", (Object)(this.resolveIPs ? "resolved host from NetworkUtils.getHostIP()" : "unresolved host"), (Object)hostIP);
        }
        return hostIP;
    }

    private static void processOfflineRequest(AsyncNodeContext context, ServerTaskBundleNode nodeBundle) throws Exception {
        if (debugEnabled) {
            log.debug("processing offline request, nodeBundle={} for node={}", (Object)nodeBundle, (Object)context);
        }
        context.getServer().getOfflineNodeHandler().addNodeBundle(nodeBundle);
        context.cleanup();
        context.getServer().closeConnection(context);
    }

    private void processOfflineReopen(NodeBundleResults received, AsyncNodeContext context) throws Exception {
        TaskBundle bundle = received.bundle();
        String jobUuid = (String)bundle.getParameter((Object)BundleParameter.JOB_UUID);
        long id = bundle.getBundleId();
        ServerTaskBundleNode nodeBundle = context.getServer().getOfflineNodeHandler().removeNodeBundle(jobUuid, id);
        if (nodeBundle == null) {
            return;
        }
        if (debugEnabled) {
            log.debug(StringUtils.build((Object[])new Object[]{"processing offline reopen with jobUuid=", jobUuid, ", id=", id, ", nodeBundle=", nodeBundle, ", node=", context}));
        }
        context.addJobEntry(nodeBundle);
        this.process(received, context);
        if (((Boolean)bundle.getParameter((Object)BundleParameter.CLOSE_COMMAND, (Object)false)).booleanValue()) {
            context.cleanup();
            context.getServer().closeConnection(context);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process(NodeBundleResults received, AsyncNodeContext context) throws Exception {
        TaskBundle bundle = (TaskBundle)received.first();
        ServerTaskBundleNode nodeBundle = context.removeJobEntry(bundle.getUuid(), bundle.getBundleId());
        ServerJob job = nodeBundle.getServerJob();
        boolean mustProcess = true;
        job.getLock().lock();
        try {
            if (job.isCancelled()) {
                mustProcess = false;
            } else {
                for (ServerTask task : nodeBundle.getTaskList()) {
                    task.setReturnedFromNode(true);
                }
            }
        }
        finally {
            job.getLock().unlock();
        }
        context.getServer().getDispatchExpirationHandler().cancelAction((Object)ServerTaskBundleNode.makeKey(nodeBundle), false);
        if (mustProcess) {
            boolean requeue = false;
            try {
                TaskBundle newBundle = received.bundle();
                if (debugEnabled) {
                    log.debug("read bundle " + newBundle + " from node " + (Object)((Object)context));
                }
                requeue = this.processResults(context, received, nodeBundle);
            }
            catch (Throwable t) {
                log.error(t.getMessage(), t);
                nodeBundle.setJobReturnReason(JobReturnReason.DRIVER_PROCESSING_ERROR);
                nodeBundle.resultsReceived(t);
            }
            if (requeue) {
                nodeBundle.resubmit();
            }
        }
        if (!context.isOffline()) {
            this.updateMaxJobs(context, bundle);
        }
        if (context.getCurrentNbJobs() < context.getMaxJobs()) {
            if (debugEnabled) {
                log.debug("updating execution status to ACTIVE for {}", (Object)context);
            }
            context.setExecutionStatus(ExecutorStatus.ACTIVE);
        }
    }

    private boolean processResults(AsyncNodeContext context, NodeBundleResults received, ServerTaskBundleNode nodeBundle) throws Exception {
        TaskBundle newBundle = received.bundle();
        Bundler<?> bundler = context.getBundler();
        ServerJob job = nodeBundle.getClientJob();
        Throwable t = (Throwable)newBundle.getParameter((Object)BundleParameter.NODE_EXCEPTION_PARAM);
        if (t != null) {
            if (debugEnabled) {
                log.debug("node {} returned exception parameter in the header for bundle {} : {}", new Object[]{context, newBundle, ExceptionUtils.getMessage((Throwable)t)});
            }
            nodeBundle.setJobReturnReason(JobReturnReason.NODE_PROCESSING_ERROR);
            nodeBundle.resultsReceived(t);
        } else if (job.isCancelled()) {
            if (debugEnabled) {
                log.debug("received bundle with {} tasks for already cancelled job: {}", (Object)((List)received.second()).size(), (Object)received.bundle());
            }
            if (!nodeBundle.isCancelled()) {
                if (debugEnabled) {
                    log.debug("node bundle was not cancelled: {}", (Object)nodeBundle);
                }
                job.cancelDispatch(nodeBundle);
            }
        } else {
            if (debugEnabled) {
                log.debug("received bundle with {} tasks, taskCount={}: {}", new Object[]{((List)received.second()).size(), newBundle.getTaskCount(), received.bundle()});
            }
            if (nodeBundle.getJobReturnReason() == null) {
                nodeBundle.setJobReturnReason(JobReturnReason.RESULTS_RECEIVED);
            }
            if (!nodeBundle.isExpired()) {
                HashSet<Integer> resubmitSet = null;
                int[] resubmitPositions = (int[])newBundle.getParameter((Object)BundleParameter.RESUBMIT_TASK_POSITIONS, null);
                if (debugEnabled) {
                    log.debug("resubmitPositions = {} for {}", (Object)resubmitPositions, (Object)newBundle);
                }
                if (resubmitPositions != null) {
                    resubmitSet = new HashSet<Integer>();
                    for (int n : resubmitPositions) {
                        resubmitSet.add(n);
                    }
                    if (debugEnabled) {
                        log.debug("resubmitSet = {} for {}", resubmitSet, (Object)newBundle);
                    }
                }
                int count = 0;
                for (ServerTask task : nodeBundle.getTaskList()) {
                    if (resubmitSet == null || !resubmitSet.contains(task.getPosition()) || task.incResubmitCount() > task.getMaxResubmits()) continue;
                    task.resubmit();
                    ++count;
                }
                if (count > 0) {
                    context.updateStatsUponTaskResubmit(count);
                }
            } else if (debugEnabled) {
                log.debug("bundle has expired: {}", (Object)nodeBundle);
            }
            if (debugEnabled) {
                log.debug("nodeBundle={}", (Object)nodeBundle);
            }
            bundler = this.updateBundlerAndStats(context, bundler, nodeBundle, newBundle);
            nodeBundle.resultsReceived(received.data());
            if (debugEnabled) {
                log.debug("updated stats for {}", (Object)context);
            }
        }
        JPPFSystemInformation systemInfo = (JPPFSystemInformation)newBundle.getParameter((Object)BundleParameter.SYSTEM_INFO_PARAM);
        if (systemInfo != null) {
            context.setNodeInfo(systemInfo, true);
            if (bundler instanceof ChannelAwareness) {
                ((ChannelAwareness)bundler).setChannelConfiguration(systemInfo);
            }
        }
        return newBundle.isRequeue();
    }

    void notificationReceived(AsyncNodeContext context, NotificationBundle notification) throws Exception {
        switch (notification.getNotificationType()) {
            case THROTTLING: {
                Boolean accepting = (Boolean)notification.getParameter((Object)BundleParameter.NODE_ACCEPTS_NEW_JOBS, (Object)true);
                if (debugEnabled) {
                    log.debug("received notification that the node {} new jobs, node {}", (Object)(accepting != false ? "accepts" : "does not accept"), (Object)context);
                }
                context.setAcceptingNewJobs(accepting);
            }
        }
    }

    private Bundler<?> updateBundlerAndStats(AsyncNodeContext context, Bundler<?> currentBundler, ServerTaskBundleNode nodeBundle, TaskBundle newBundle) {
        Bundler<?> bundler;
        long elapsed = System.nanoTime() - nodeBundle.getJob().getExecutionStartTime();
        Bundler<?> bundler2 = bundler = currentBundler == null ? context.checkBundler(context.getServer().getBundlerFactory(), context.getServer().getJPPFContext()) : currentBundler;
        if (bundler instanceof BundlerEx) {
            long accumulatedTime = (Long)newBundle.getParameter((Object)BundleParameter.NODE_BUNDLE_ELAPSED_PARAM, (Object)-1L);
            BundlerHelper.updateBundler((BundlerEx)((BundlerEx)bundler), (int)newBundle.getTaskCount(), (double)elapsed, (double)accumulatedTime, (double)(elapsed - newBundle.getNodeExecutionTime()));
        } else {
            BundlerHelper.updateBundler(bundler, (int)newBundle.getTaskCount(), (double)elapsed);
        }
        if (debugEnabled) {
            log.debug("updated bundler for {}", (Object)context);
        }
        context.getServer().getBundlerHandler().storeBundler(context.getNodeIdentifier(), bundler, context.getBundlerAlgorithm());
        this.updateStats(newBundle.getTaskCount(), elapsed / 1000000L, newBundle.getNodeExecutionTime() / 1000000L);
        return bundler;
    }

    private void updateStats(int nbTasks, long elapsed, long elapsedInNode) {
        JPPFStatistics stats = this.driver.getStatistics();
        stats.addValue("task.dispatch", (double)nbTasks);
        stats.addValues("execution", (double)elapsed, (long)nbTasks);
        stats.addValues("node.execution", (double)elapsedInNode, (long)nbTasks);
        stats.addValues("transport.time", (double)(elapsed - elapsedInNode), (long)nbTasks);
    }

    private void updateMaxJobs(AsyncNodeContext context, TaskBundle bundle) {
        int maxJobs;
        int n = context.getMaxJobs();
        Integer newMaxJobs = (Integer)bundle.getParameter((Object)BundleParameter.NODE_MAX_JOBS);
        int n2 = maxJobs = newMaxJobs == null ? ((Integer)this.driver.getConfiguration().get(JPPFProperties.NODE_MAX_JOBS)).intValue() : newMaxJobs.intValue();
        if (debugEnabled) {
            log.debug("n={}, newMaxJobs={}, computed maxJobs={}, context={}", new Object[]{n, newMaxJobs, maxJobs, context});
        }
        if (maxJobs > 0) {
            context.setMaxJobs(maxJobs);
        }
    }

    private static Pair<String, String> getNodeIdentifier(JPPFBundlerFactory factory, BaseNodeContext channel, JPPFSystemInformation info) throws Exception {
        if (factory.getPersistence() == null) {
            return null;
        }
        StringBuilder sb = new StringBuilder();
        String ip = NetworkUtils.getNonLocalHostAddress();
        sb.append('[').append(ip == null ? "localhost" : ip);
        if (channel.getSocketChannel() != null) {
            SocketChannel ch = channel.getSocketChannel();
            sb.append(':').append(ch.socket().getLocalPort()).append(']');
            InetSocketAddress isa = (InetSocketAddress)ch.getRemoteAddress();
            sb.append(isa.getAddress().getHostAddress());
        } else if (channel.isLocal()) {
            sb.append("local_channel").append(']');
        }
        TypedProperties jppf = info.getJppf();
        boolean master = (Boolean)jppf.get(JPPFProperties.PROVISIONING_MASTER);
        boolean slave = (Boolean)jppf.get(JPPFProperties.PROVISIONING_SLAVE);
        if (master || slave) {
            sb.append(master ? "master" : "slave");
            sb.append((String)jppf.get(JPPFProperties.PROVISIONING_SLAVE_PATH_PREFIX));
            if (slave) {
                sb.append(jppf.get(JPPFProperties.PROVISIONING_SLAVE_ID));
            }
        }
        String s = sb.toString();
        return new Pair((Object)s, (Object)CryptoUtils.computeHash((String)s, (String)factory.getHashAlgorithm()));
    }

    private static String getChannelHost(BaseNodeContext context) throws Exception {
        if (!context.isLocal()) {
            SocketChannel ch = context.getSocketChannel();
            return ((InetSocketAddress)ch.getRemoteAddress()).getHostString();
        }
        return "localhost";
    }
}

