/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.client;

import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jppf.JPPFException;
import org.jppf.client.ClassServerDelegate;
import org.jppf.client.JPPFClient;
import org.jppf.client.JPPFClientConnection;
import org.jppf.client.JPPFClientConnectionStatus;
import org.jppf.client.JPPFConnectionPool;
import org.jppf.client.JPPFJob;
import org.jppf.client.TaskServerConnectionHandler;
import org.jppf.client.balancer.ClientTaskBundle;
import org.jppf.comm.socket.SocketInitializer;
import org.jppf.comm.socket.SocketWrapper;
import org.jppf.io.IOHelper;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.BundleWithTasks;
import org.jppf.node.protocol.JPPFTaskBundle;
import org.jppf.node.protocol.PositionalElement;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.node.protocol.graph.TaskGraphInfo;
import org.jppf.serialization.ObjectSerializer;
import org.jppf.serialization.SerializationHelper;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.JPPFUuid;
import org.jppf.utils.LoggingUtils;
import org.jppf.utils.ObjectSerializerImpl;
import org.jppf.utils.StringUtils;
import org.jppf.utils.TraversalList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class BaseJPPFClientConnection
implements JPPFClientConnection {
    private static Logger log = LoggerFactory.getLogger(BaseJPPFClientConnection.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private static boolean traceEnabled = log.isTraceEnabled();
    private static Lock lock = new ReentrantLock();
    private final boolean SEQUENTIAL_DESERIALIZATION;
    static AtomicInteger connectionCount = new AtomicInteger(0);
    TaskServerConnectionHandler taskServerConnection;
    ClassServerDelegate delegate;
    String name;
    String connectionUuid;
    AtomicReference<JPPFClientConnectionStatus> status = new AtomicReference<JPPFClientConnectionStatus>(JPPFClientConnectionStatus.NEW);
    final Object statusChangeLock = new Object();
    final JPPFConnectionPool pool;
    private final ObjectSerializer defaultSerializer;

    BaseJPPFClientConnection(JPPFConnectionPool pool) {
        this.pool = pool;
        this.SEQUENTIAL_DESERIALIZATION = pool.getClient().getConfig().getBoolean("jppf.sequential.deserialization", false);
        this.defaultSerializer = new ObjectSerializerImpl();
    }

    abstract void init();

    public List<Task<?>> sendTasks(ObjectSerializer ser, ClassLoader cl, TaskBundle header, ClientTaskBundle clientBundle) throws Exception {
        JPPFJob job = clientBundle.getClientJob().getJob();
        TraversalList uuidPath = new TraversalList();
        uuidPath.add((Object)this.pool.getClient().getUuid());
        header.setUuidPath(uuidPath);
        header.setTaskCount(clientBundle.getTasksL().size());
        header.setName(job.getName());
        header.setUuid(job.getUuid());
        header.setSLA(job.getSLA());
        header.setMetadata(job.getMetadata());
        List deps = null;
        TaskGraphInfo graphInfo = null;
        if (clientBundle.getClientJob().getTaskGraph() != null && job.getClientSLA().isGraphTraversalInClient()) {
            graphInfo = clientBundle.getGraphInfo();
        }
        if (graphInfo != null) {
            deps = graphInfo.getDependencies();
            header.setParameter((Object)BundleParameter.JOB_TASK_GRAPH_INFO, (Object)graphInfo);
            int[] positions = graphInfo.getDependenciesPositions();
            if (debugEnabled) {
                log.debug("sending {} dependencies with positions {}", (Object)deps.size(), (Object)Arrays.toString(positions));
            }
        }
        if (debugEnabled) {
            log.debug("found {} dependencies for bundle {}", (Object)(deps == null ? 0 : deps.size()), (Object)clientBundle);
        }
        List<Task<?>> tasks = this.prepareTasksToSend(header, clientBundle);
        SocketWrapper socketClient = this.taskServerConnection.getSocketClient();
        IOHelper.sendData((SocketWrapper)socketClient, (Object)header, (ObjectSerializer)ser);
        try {
            IOHelper.sendData((SocketWrapper)socketClient, (Object)job.getDataProvider(), (ObjectSerializer)ser);
        }
        catch (NotSerializableException e) {
            log.error("error serializing data provider for {} : {}\nthe job will be cancelled", (Object)job, (Object)ExceptionUtils.getStackTrace((Throwable)e));
            IOHelper.sendData((SocketWrapper)socketClient, null, (ObjectSerializer)ser);
        }
        List<Task<?>> notSerializableTasks = BaseJPPFClientConnection.sendTasks(job, ser, socketClient, tasks);
        if (deps != null) {
            BaseJPPFClientConnection.sendTasks(job, ser, socketClient, deps);
        }
        socketClient.flush();
        return notSerializableTasks;
    }

    private static List<Task<?>> sendTasks(JPPFJob job, ObjectSerializer ser, SocketWrapper socketClient, List<? extends PositionalElement<?>> tasks) throws Exception {
        ArrayList notSerializableTasks = new ArrayList(tasks.size());
        for (PositionalElement<?> task : tasks) {
            try {
                IOHelper.sendData((SocketWrapper)socketClient, task, (ObjectSerializer)ser);
            }
            catch (NotSerializableException e) {
                log.error("error serializing task {} for {} : {}", new Object[]{task, job, ExceptionUtils.getStackTrace((Throwable)e)});
                ((Task)task).setThrowable((Throwable)e);
                IOHelper.sendNullData((SocketWrapper)socketClient);
                notSerializableTasks.add((Task)task);
            }
        }
        return notSerializableTasks;
    }

    private List<Task<?>> prepareTasksToSend(TaskBundle header, ClientTaskBundle clientBundle) {
        List<Task<?>> allTasks = clientBundle.getTasksL();
        int count = allTasks.size();
        int[] positions = new int[count];
        int[] maxResubmits = new int[count];
        ArrayList tasks = new ArrayList(count);
        int i = 0;
        JPPFJob job = clientBundle.getClientJob().getJob();
        for (Task<?> task : allTasks) {
            int pos = task.getPosition();
            if (job.getResults().hasResult(pos)) continue;
            tasks.add(task);
            positions[i] = pos;
            maxResubmits[i] = task.getMaxResubmits();
            ++i;
        }
        header.setParameter((Object)BundleParameter.TASK_POSITIONS, (Object)positions);
        header.setParameter((Object)BundleParameter.TASK_MAX_RESUBMITS, (Object)maxResubmits);
        if (traceEnabled) {
            log.trace(this.toDebugString() + " sending job " + header + ", positions=" + StringUtils.buildString((int[])positions));
        }
        return tasks;
    }

    TaskBundle sendHandshakeJob() throws Exception {
        JPPFTaskBundle header = new JPPFTaskBundle();
        ObjectSerializerImpl ser = new ObjectSerializerImpl();
        TraversalList uuidPath = new TraversalList();
        uuidPath.add((Object)this.pool.getClient().getUuid());
        header.setUuidPath(uuidPath);
        if (debugEnabled) {
            log.debug("{} sending handshake job, uuidPath={}", (Object)this.toDebugString(), (Object)uuidPath);
        }
        header.setUuid(JPPFUuid.normalUUID());
        header.setName("handshake job");
        header.setHandshake(true);
        header.setUuid(header.getName());
        header.setParameter((Object)BundleParameter.CONNECTION_UUID, (Object)this.connectionUuid);
        header.setSLA(null);
        header.setMetadata(null);
        SocketWrapper socketClient = this.taskServerConnection.getSocketClient();
        IOHelper.sendData((SocketWrapper)socketClient, (Object)header, (ObjectSerializer)ser);
        IOHelper.sendData((SocketWrapper)socketClient, null, (ObjectSerializer)ser);
        socketClient.flush();
        if (debugEnabled) {
            log.debug("{} sent handshake job, receiving handshake results", (Object)this.toDebugString());
        }
        return (TaskBundle)this.receiveBundleAndResults((ObjectSerializer)ser, this.getClass().getClassLoader()).first();
    }

    void sendCloseConnectionCommand() throws Exception {
        if (this.taskServerConnection == null) {
            return;
        }
        JPPFTaskBundle header = new JPPFTaskBundle();
        ObjectSerializerImpl ser = new ObjectSerializerImpl();
        TraversalList uuidPath = new TraversalList();
        uuidPath.add((Object)this.pool.getClient().getUuid());
        header.setUuidPath(uuidPath);
        if (debugEnabled) {
            log.debug("{} sending close command job, uuidPath={}", (Object)this.toDebugString(), (Object)uuidPath);
        }
        header.setName("close command job");
        header.setUuid("close command job");
        header.setParameter((Object)BundleParameter.CONNECTION_UUID, (Object)this.connectionUuid);
        header.setParameter((Object)BundleParameter.CLOSE_COMMAND, (Object)true);
        header.setSLA(null);
        header.setMetadata(null);
        SocketWrapper socketClient = this.taskServerConnection.getSocketClient();
        if (socketClient != null) {
            IOHelper.sendData((SocketWrapper)socketClient, (Object)header, (ObjectSerializer)ser);
            IOHelper.sendData((SocketWrapper)socketClient, null, (ObjectSerializer)ser);
            socketClient.flush();
        }
        if (debugEnabled) {
            log.debug("{} sent close command job", (Object)this.toDebugString());
        }
    }

    public BundleWithTasks receiveBundleAndResults(ObjectSerializer ser, ClassLoader cl) throws Exception {
        TaskBundle bundle = this.receiveHeader(ser, cl);
        List<Task<?>> tasks = this.receiveTasks(bundle, ser, cl);
        return new BundleWithTasks(bundle, tasks);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TaskBundle receiveHeader(ObjectSerializer ser, ClassLoader cl) throws Exception {
        TaskBundle bundle = null;
        ObjectSerializer actualSerializer = ser == null ? this.defaultSerializer : ser;
        ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
        try {
            ClassLoader loader = cl == null ? this.getClass().getClassLoader() : cl;
            Thread.currentThread().setContextClassLoader(loader);
            SocketWrapper socketClient = this.taskServerConnection.getSocketClient();
            TaskBundle taskBundle = bundle = (TaskBundle)IOHelper.unwrappedData((SocketWrapper)socketClient, (ObjectSerializer)actualSerializer);
            return taskBundle;
        }
        finally {
            Thread.currentThread().setContextClassLoader(ctxCl);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Task<?>> receiveTasks(TaskBundle bundle, ObjectSerializer ser, ClassLoader cl) throws Exception {
        ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
        try {
            ClassLoader loader = cl == null ? this.getClass().getClassLoader() : cl;
            Thread.currentThread().setContextClassLoader(loader);
            SocketWrapper socketClient = this.taskServerConnection.getSocketClient();
            int count = bundle.getTaskCount();
            int[] positions = (int[])bundle.getParameter((Object)BundleParameter.TASK_POSITIONS);
            ArrayList taskList = new ArrayList(count);
            if (debugEnabled) {
                log.debug("{} : received bundle {},  positions={}", new Object[]{this.toDebugString(), bundle, StringUtils.buildString((int[])positions)});
            }
            if (this.SEQUENTIAL_DESERIALIZATION) {
                lock.lock();
            }
            try {
                for (int i = 0; i < count; ++i) {
                    Task task2 = (Task)IOHelper.unwrappedData((SocketWrapper)socketClient, (ObjectSerializer)ser);
                    if (task2 == null) continue;
                    if (positions != null && i < positions.length) {
                        task2.setPosition(positions[i]);
                    }
                    taskList.add(task2);
                }
            }
            finally {
                if (this.SEQUENTIAL_DESERIALIZATION) {
                    lock.unlock();
                }
            }
            Throwable t = (Throwable)bundle.getParameter((Object)BundleParameter.NODE_EXCEPTION_PARAM);
            if (t != null) {
                if (debugEnabled) {
                    log.debug(this.toDebugString() + " : server returned exception parameter in the header for job '" + bundle.getName() + "' : " + t);
                }
                Exception e = t instanceof Exception ? (Exception)t : new JPPFException(t);
                taskList.forEach(task -> task.setThrowable((Throwable)e));
            }
            ArrayList arrayList = taskList;
            return arrayList;
        }
        finally {
            Thread.currentThread().setContextClassLoader(ctxCl);
        }
    }

    public SerializationHelper makeHelper(ClassLoader classLoader) throws Exception {
        return this.makeHelper(classLoader, this.pool.getClient().getSerializationHelperClassName());
    }

    public SerializationHelper makeHelper(ClassLoader classLoader, String helperClassName) throws Exception {
        ClassLoader[] clArray = new ClassLoader[]{classLoader, Thread.currentThread().getContextClassLoader(), this.getClass().getClassLoader()};
        Class<?> clazz = null;
        for (ClassLoader cl : clArray) {
            try {
                if (cl == null) continue;
                clazz = Class.forName(helperClassName, true, cl);
                break;
            }
            catch (Exception e) {
                if (debugEnabled) {
                    log.debug(e.getMessage(), (Throwable)e);
                }
                if (clazz != null) continue;
                throw new IllegalStateException("could not load class " + helperClassName + " from any of these class loaders: " + Arrays.asList(clArray));
            }
        }
        return (SerializationHelper)clazz.newInstance();
    }

    @Override
    public String getName() {
        return this.name;
    }

    abstract SocketInitializer createSocketInitializer();

    public TaskServerConnectionHandler getTaskServerConnection() {
        return this.taskServerConnection;
    }

    public JPPFClient getClient() {
        return this.pool.getClient();
    }

    @Override
    public String getDriverUuid() {
        return this.pool.getDriverUuid();
    }

    @Override
    public String getConnectionUuid() {
        return this.connectionUuid;
    }

    @Override
    public String getHost() {
        return this.pool.getDriverHost();
    }

    @Override
    public int getPort() {
        return this.pool.getDriverPort();
    }

    protected String toDebugString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClass().getSimpleName()).append('[');
        sb.append("connectionUuid=").append(this.connectionUuid);
        sb.append(", status=").append(this.status);
        sb.append(']');
        return sb.toString();
    }
}

