/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.server.nio.nodeserver.async;

import java.io.EOFException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import org.jppf.execute.ExecutorChannelStatusListener;
import org.jppf.execute.ExecutorStatus;
import org.jppf.io.DataLocation;
import org.jppf.io.MultipleBuffersLocation;
import org.jppf.load.balancer.JPPFContext;
import org.jppf.load.balancer.persistence.LoadBalancerPersistenceManager;
import org.jppf.load.balancer.spi.JPPFBundlerFactory;
import org.jppf.management.JPPFManagementInfo;
import org.jppf.nio.NioChannelHandler;
import org.jppf.nio.NioConstants;
import org.jppf.nio.NioHelper;
import org.jppf.nio.SSLHandler;
import org.jppf.nio.SSLHandlerImpl;
import org.jppf.nio.StatelessNioServer;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.JPPFTaskBundle;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.persistence.JPPFDatasourceFactory;
import org.jppf.queue.QueueEvent;
import org.jppf.queue.QueueListener;
import org.jppf.queue.QueueListenerAdapter;
import org.jppf.scheduling.JPPFScheduleHandler;
import org.jppf.serialization.SerializationUtils;
import org.jppf.server.JPPFDriver;
import org.jppf.server.event.NodeConnectionEventHandler;
import org.jppf.server.nio.nodeserver.BaseNodeContext;
import org.jppf.server.nio.nodeserver.NodeConnectionCompletionListener;
import org.jppf.server.nio.nodeserver.NodeReservationHandler;
import org.jppf.server.nio.nodeserver.OfflineNodeHandler;
import org.jppf.server.nio.nodeserver.PeerAttributesHandler;
import org.jppf.server.nio.nodeserver.async.AsyncJobScheduler;
import org.jppf.server.nio.nodeserver.async.AsyncNodeContext;
import org.jppf.server.nio.nodeserver.async.AsyncNodeMessageHandler;
import org.jppf.server.nio.nodeserver.async.AsyncNodeMessageReader;
import org.jppf.server.nio.nodeserver.async.AsyncNodeMessageWriter;
import org.jppf.server.protocol.ServerJob;
import org.jppf.server.protocol.ServerTaskBundleClient;
import org.jppf.server.protocol.ServerTaskBundleNode;
import org.jppf.server.queue.JPPFPriorityQueue;
import org.jppf.ssl.SSLHelper;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.JPPFBuffer;
import org.jppf.utils.SerializationHelperImpl;
import org.jppf.utils.TypedProperties;
import org.jppf.utils.concurrent.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AsyncNodeNioServer
extends StatelessNioServer<AsyncNodeContext>
implements NodeConnectionCompletionListener {
    private static final Logger log = LoggerFactory.getLogger(AsyncNodeNioServer.class);
    private static final boolean debugEnabled = log.isDebugEnabled();
    private final AsyncNodeMessageHandler messageHandler;
    private final JPPFDriver driver;
    private final ServerJob initialServerJob;
    private final JPPFPriorityQueue queue;
    private final JPPFBundlerFactory bundlerFactory;
    private final AsyncJobScheduler jobScheduler;
    private final Map<String, AsyncNodeContext> allConnections = new ConcurrentHashMap<String, AsyncNodeContext>();
    final NodeConnectionEventHandler nodeConnectionHandler;
    private final ExecutorChannelStatusListener statusListener = event -> {
        if (event.getSource() instanceof AsyncNodeContext) {
            this.updateConnectionStatus((AsyncNodeContext)((Object)((Object)event.getSource())), event.getOldValue(), event.getNewValue());
        }
    };
    private final OfflineNodeHandler offlineNodeHandler = new OfflineNodeHandler();
    private final JPPFScheduleHandler dispatchExpirationHandler = new JPPFScheduleHandler("DispatchExpiration");
    private final PeerAttributesHandler peerHandler;
    private final NodeReservationHandler nodeReservationHandler;
    private final LoadBalancerPersistenceManager bundlerHandler;

    public AsyncNodeNioServer(JPPFDriver driver, int identifier, boolean useSSL) throws Exception {
        super(identifier, useSSL, driver.getConfiguration());
        this.driver = driver;
        this.selectTimeout = 1000L;
        this.messageHandler = new AsyncNodeMessageHandler(driver);
        this.queue = driver.getQueue();
        Callable<List<BaseNodeContext>> callable = () -> this.getAllChannels();
        this.queue.setCallableAllConnections(callable);
        this.peerHandler = new PeerAttributesHandler(driver, Math.max(1, driver.getConfiguration().getInt("jppf.peer.handler.threads", 1)));
        this.nodeConnectionHandler = driver.getInitializer().getNodeConnectionEventHandler();
        this.bundlerFactory = new JPPFBundlerFactory(driver.getConfiguration());
        this.bundlerHandler = new LoadBalancerPersistenceManager(this.bundlerFactory);
        this.selectTimeout = NioConstants.DEFAULT_SELECT_TIMEOUT;
        this.jobScheduler = new AsyncJobScheduler(this, this.queue, driver.getStatistics(), this.bundlerFactory);
        this.queue.addQueueListener((QueueListener)new QueueListenerAdapter<ServerJob, ServerTaskBundleClient, ServerTaskBundleNode>(){

            public void bundleAdded(QueueEvent<ServerJob, ServerTaskBundleClient, ServerTaskBundleNode> event) {
                if (debugEnabled) {
                    log.debug("received bundle added queue event {}", event);
                }
                AsyncNodeNioServer.this.jobScheduler.wakeUp();
            }

            public void bundleRemoved(QueueEvent<ServerJob, ServerTaskBundleClient, ServerTaskBundleNode> event) {
                if (debugEnabled) {
                    log.debug("received bundle removed queue event {}", event);
                }
                AsyncNodeNioServer.this.jobScheduler.wakeUp();
            }
        });
        this.initialServerJob = AsyncNodeNioServer.createInitialServerJob(driver);
        this.nodeReservationHandler = new NodeReservationHandler(driver);
        ThreadUtils.startDaemonThread((Runnable)this.jobScheduler, (String)"JobScheduler");
    }

    protected void initReaderAndWriter() {
        this.messageReader = new AsyncNodeMessageReader(this);
        this.messageWriter = new AsyncNodeMessageWriter(this);
    }

    protected void handleSelectionException(SelectionKey key, Exception e) {
        AsyncNodeContext context = (AsyncNodeContext)((Object)key.attachment());
        if (e instanceof CancelledKeyException) {
            if (context != null && !context.isClosed()) {
                log.error("error on {} : ", (Object)context, (Object)e);
                this.closeConnection(context);
            }
        } else if (e instanceof EOFException) {
            if (debugEnabled) {
                log.debug("error on {} : ", (Object)context, (Object)e);
            } else {
                log.warn("error [{}] on {}", (Object)e.toString(), (Object)context);
            }
            context.handleException(e);
        } else {
            log.error("error on {}", (Object)context, (Object)e);
            if (context != null) {
                context.handleException(e);
            }
        }
    }

    public void accept(ServerSocketChannel serverSocketChannel, SocketChannel channel, SSLHandler sslHandler, boolean ssl, boolean peer, Object ... params) {
        try {
            if (debugEnabled) {
                log.debug("accepting socketChannel = {}", (Object)channel);
            }
            AsyncNodeContext context = this.createContext(channel, ssl);
            context.setPeer(peer);
            this.registerChannel((NioChannelHandler)context, channel);
            this.messageHandler.sendHandshakeBundle(context, this.getHandshakeBundle());
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        this.driver.getStatistics().addValue("nodes", 1.0);
    }

    private AsyncNodeContext createContext(SocketChannel channel, boolean ssl) throws Exception {
        AsyncNodeContext context = this.createNioContext(channel);
        if (debugEnabled) {
            log.debug("creating context for channel={}, ssl={}: {}", new Object[]{channel, ssl, context});
        }
        context.setSsl(ssl);
        if (ssl) {
            if (debugEnabled) {
                log.debug("creating SSLEngine for {}", (Object)context);
            }
            AsyncNodeNioServer.configureSSL(context);
        }
        return context;
    }

    private static void configureSSL(AsyncNodeContext context) throws Exception {
        if (debugEnabled) {
            log.debug("configuring SSL for {}", (Object)context);
        }
        SocketChannel channel = context.getSocketChannel();
        SSLContext sslContext = SSLHelper.getSSLContext((int)65532);
        InetSocketAddress addr = (InetSocketAddress)channel.getRemoteAddress();
        SSLEngine engine = sslContext.createSSLEngine(addr.getHostString(), addr.getPort());
        SSLParameters params = SSLHelper.getSSLParameters();
        engine.setUseClientMode(false);
        engine.setSSLParameters(params);
        if (debugEnabled) {
            log.debug("created SSLEngine: useClientMode = {}, parameters = {}", (Object)engine.getUseClientMode(), (Object)engine.getSSLParameters());
        }
        SSLHandlerImpl sslHandler = new SSLHandlerImpl(channel, engine);
        context.setSSLHandler((SSLHandler)sslHandler);
    }

    public AsyncNodeContext createNioContext(Object ... params) {
        return new AsyncNodeContext(this, (SocketChannel)params[0], false);
    }

    public void closeConnection(AsyncNodeContext context) {
        if (debugEnabled) {
            log.debug("closing {}", (Object)context);
        }
        try {
            if (context.terminate()) {
                SelectionKey key = context.getSelectionKey();
                if (key != null) {
                    key.cancel();
                    key.channel().close();
                }
                this.peerHandler.onCloseNode(context);
                JPPFManagementInfo info = context.getManagementInfo();
                if (info == null) {
                    info = new JPPFManagementInfo("unknown host", "unknown host", -1, context.getUuid(), context.isPeer() ? 2 : 1, context.isSecure());
                }
                if (debugEnabled) {
                    log.debug("firing nodeDisconnected() for {}", (Object)info);
                }
                this.nodeConnectionHandler.fireNodeDisconnected(info);
                this.removeConnection(context);
            }
        }
        catch (Exception e) {
            log.error("error closing channel {}: {}", (Object)context, (Object)ExceptionUtils.getStackTrace((Throwable)e));
        }
    }

    public void removeAllConnections() {
        if (!this.isStopped()) {
            return;
        }
        super.removeAllConnections();
    }

    public AsyncNodeMessageHandler getMessageHandler() {
        return this.messageHandler;
    }

    protected void initNioHandlers() {
        super.initNioHandlers();
        this.acceptHandler = null;
    }

    public JPPFDriver getDriver() {
        return this.driver;
    }

    public JPPFContext getJPPFContext() {
        return this.jobScheduler.getJPPFContext();
    }

    public OfflineNodeHandler getOfflineNodeHandler() {
        return this.offlineNodeHandler;
    }

    public JPPFScheduleHandler getDispatchExpirationHandler() {
        return this.dispatchExpirationHandler;
    }

    public PeerAttributesHandler getPeerHandler() {
        return this.peerHandler;
    }

    public NodeReservationHandler getNodeReservationHandler() {
        return this.nodeReservationHandler;
    }

    public AsyncJobScheduler getJobScheduler() {
        return this.jobScheduler;
    }

    public LoadBalancerPersistenceManager getBundlerHandler() {
        return this.bundlerHandler;
    }

    public JPPFBundlerFactory getBundlerFactory() {
        return this.bundlerFactory;
    }

    public List<BaseNodeContext> getAllChannels() {
        return new ArrayList<BaseNodeContext>(this.allConnections.values());
    }

    public Set<BaseNodeContext> getAllChannelsAsSet() {
        return new HashSet<BaseNodeContext>(this.allConnections.values());
    }

    public void connectionFailed(BaseNodeContext context) {
        if (context != null) {
            if (debugEnabled) {
                log.debug("about to close channel = {} with uuid = {}", (Object)context, (Object)context.getUuid());
            }
            this.removeConnection(context.getUuid());
            context.handleException(null);
        }
    }

    void putConnection(AsyncNodeContext nodeContext) {
        if (debugEnabled) {
            log.debug("putting connection {}", (Object)nodeContext);
        }
        this.allConnections.put(nodeContext.getUuid(), nodeContext);
    }

    public AsyncNodeContext getConnection(String uuid) {
        return uuid == null ? null : this.allConnections.get(uuid);
    }

    private void addConnection(AsyncNodeContext nodeContext) {
        block7: {
            try {
                if (nodeContext == null) {
                    throw new IllegalArgumentException("nodeContext is null");
                }
                if (debugEnabled) {
                    log.debug("adding connection {}", (Object)nodeContext);
                }
                if (!nodeContext.isClosed()) {
                    nodeContext.addExecutionStatusListener(this.statusListener);
                    if (!nodeContext.isClosed()) {
                        nodeContext.setExecutionStatus(ExecutorStatus.ACTIVE);
                    }
                }
                if (nodeContext.isClosed()) {
                    nodeContext.handleException(null);
                }
            }
            catch (Exception e) {
                if (!debugEnabled) break block7;
                log.debug("error adding connection {} : {}", (Object)nodeContext, (Object)e);
            }
        }
    }

    private AsyncNodeContext removeConnection(String uuid) {
        if (uuid == null) {
            return null;
        }
        AsyncNodeContext nodeContext = this.getConnection(uuid);
        if (nodeContext != null) {
            this.removeConnection(nodeContext);
        }
        return nodeContext;
    }

    private void removeConnection(AsyncNodeContext nodeContext) {
        if (nodeContext == null) {
            throw new IllegalArgumentException("wrapper is null");
        }
        if (debugEnabled) {
            log.debug("removing connection {}", (Object)nodeContext);
        }
        try {
            this.jobScheduler.removeIdleChannelAsync(nodeContext);
            this.updateConnectionStatus(nodeContext, nodeContext.getExecutionStatus(), ExecutorStatus.DISABLED);
        }
        catch (Exception e) {
            if (debugEnabled) {
                log.debug("error removing connection {} : {}", (Object)nodeContext, (Object)e);
            }
        }
        finally {
            block16: {
                try {
                    String uuid = nodeContext.getUuid();
                    if (uuid != null) {
                        this.allConnections.remove(uuid);
                    }
                    nodeContext.removeExecutionStatusListener(this.statusListener);
                }
                catch (Throwable e) {
                    if (!debugEnabled) break block16;
                    log.debug("error removing connection {} : {}", (Object)nodeContext, (Object)e);
                }
            }
        }
    }

    private void updateConnectionStatus(AsyncNodeContext nodeContext, ExecutorStatus oldStatus, ExecutorStatus newStatus) {
        if (oldStatus == null) {
            throw new IllegalArgumentException("oldStatus is null");
        }
        if (newStatus == null) {
            throw new IllegalArgumentException("newStatus is null");
        }
        if (nodeContext == null || oldStatus == newStatus) {
            return;
        }
        if (debugEnabled) {
            log.debug("updating channel status from {} to {}: {}", new Object[]{oldStatus, newStatus, nodeContext});
        }
        if (newStatus == ExecutorStatus.ACTIVE) {
            this.jobScheduler.addIdleChannel(nodeContext);
        } else {
            this.jobScheduler.removeIdleChannelAsync(nodeContext);
            if (newStatus == ExecutorStatus.FAILED || newStatus == ExecutorStatus.DISABLED) {
                NioHelper.getGlobalexecutor().execute(() -> this.queue.getBroadcastManager().cancelBroadcastJobs(nodeContext.getUuid()));
            }
        }
        this.queue.updateWorkingConnections(oldStatus, newStatus);
    }

    @Override
    public void nodeConnected(BaseNodeContext context) {
        if (debugEnabled) {
            log.debug("node connected: {}", (Object)context);
        }
        JPPFManagementInfo info = context.getManagementInfo();
        if (!context.isClosed()) {
            this.peerHandler.onNodeConnected(context);
            this.addConnection((AsyncNodeContext)context);
            if (!context.isClosed() && info != null) {
                this.nodeConnectionHandler.fireNodeConnected(info);
            }
        }
        if (context.isClosed()) {
            context.handleException(null);
        }
    }

    public ServerTaskBundleNode getHandshakeBundle() {
        return this.initialServerJob.createNodeDispatch(0);
    }

    public AsyncNodeContext activateNode(String uuid, boolean activate) {
        AsyncNodeContext nodeContext = this.getConnection(uuid);
        if (nodeContext == null) {
            return null;
        }
        if (activate != nodeContext.isActive()) {
            nodeContext.setActive(activate);
        }
        return nodeContext;
    }

    private static ServerJob createInitialServerJob(JPPFDriver driver) {
        try {
            SerializationHelperImpl helper = new SerializationHelperImpl();
            JPPFBuffer buf = helper.getSerializer().serialize(null);
            byte[] lengthBytes = SerializationUtils.writeInt((int)buf.getLength());
            JPPFTaskBundle bundle = new JPPFTaskBundle();
            bundle.setName("server handshake");
            bundle.setUuid(driver.getUuid());
            bundle.getUuidPath().add((Object)driver.getUuid());
            bundle.setTaskCount(0);
            bundle.setHandshake(true);
            JPPFDatasourceFactory factory = JPPFDatasourceFactory.getInstance();
            TypedProperties config = driver.getConfiguration();
            HashMap defMap = new HashMap();
            defMap.putAll(JPPFDatasourceFactory.extractDefinitions((TypedProperties)config, (JPPFDatasourceFactory.Scope)JPPFDatasourceFactory.Scope.REMOTE));
            bundle.setParameter((Object)BundleParameter.DATASOURCE_DEFINITIONS, defMap);
            return new ServerJob(new ReentrantLock(), null, (TaskBundle)bundle, (DataLocation)new MultipleBuffersLocation(new JPPFBuffer[]{new JPPFBuffer(lengthBytes), buf}));
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
            return null;
        }
    }
}

