/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.nio.nodeserver.async;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.jppf.nio.NioHelper;
import org.jppf.nio.NioMessage;
import org.jppf.nio.NioMessageReader;
import org.jppf.nio.StatelessNioServer;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.NotificationBundle;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.server.nio.AbstractTaskBundleMessage;
import org.jppf.server.nio.nodeserver.async.AsyncNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncNodeMessageHandler;
import org.jppf.server.nio.nodeserver.async.AsyncNodeNioServer;
import org.jppf.utils.concurrent.JPPFThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncNodeMessageReader
extends NioMessageReader<AsyncNodeContext> {
    private static final Logger log = LoggerFactory.getLogger(AsyncNodeMessageReader.class);
    private static final boolean debugEnabled = log.isDebugEnabled();
    private final ExecutorService executor;

    public AsyncNodeMessageReader(AsyncNodeNioServer server) {
        super((StatelessNioServer)server);
        int n = server.getConfiguration().getInt("jppf.node.reader.max.threads", Runtime.getRuntime().availableProcessors());
        this.executor = n > 0 ? Executors.newFixedThreadPool(n, (ThreadFactory)new JPPFThreadFactory("NodeReader")) : NioHelper.getGlobalexecutor();
    }

    protected NioMessageReader.MessageHandler<AsyncNodeContext> createMessageHandler() {
        return AsyncNodeMessageReader::handleMessage;
    }

    private static void handleMessage(AsyncNodeContext context, NioMessage message) throws Exception {
        if (debugEnabled) {
            log.debug("read message = {} from context = {}", (Object)message, (Object)context);
        }
        AbstractTaskBundleMessage msg = (AbstractTaskBundleMessage)message;
        TaskBundle header = msg.getBundle();
        AsyncNodeMessageHandler handler = context.getServer().getMessageHandler();
        if (debugEnabled) {
            log.debug("read bundle {} from node {}", (Object)header, (Object)context);
        }
        if (header.isHandshake() || ((Boolean)header.getParameter((Object)BundleParameter.NODE_OFFLINE_OPEN_REQUEST, (Object)false)).booleanValue()) {
            handler.handshakeReceived(context, msg);
        } else if (header.isNotification()) {
            handler.notificationReceived(context, (NotificationBundle)header);
        } else {
            handler.resultsReceived(context, msg);
        }
    }

    protected ExecutorService getExecutor() {
        return this.executor;
    }
}

