/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.ggfs;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsControlResponse;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsDataInputStream;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsDataOutputStream;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsIpcCommand;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsMarshaller;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsMessage;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsClientSession;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsContext;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsIpcHandler;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsServerHandler;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.spi.GridPortProtocol;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.util.ipc.GridIpcEndpoint;
import org.gridgain.grid.util.ipc.GridIpcServerEndpoint;
import org.gridgain.grid.util.ipc.GridIpcServerEndpointDeserializer;
import org.gridgain.grid.util.ipc.loopback.GridIpcServerTcpEndpoint;
import org.gridgain.grid.util.ipc.shmem.GridIpcSharedMemoryServerEndpoint;
import org.gridgain.grid.util.typedef.CIX1;
import org.gridgain.grid.util.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;
import org.jdk8.backport.ConcurrentLinkedDeque8;
import org.jetbrains.annotations.Nullable;

public class GridGgfsServer {
    private final GridGgfsContext ggfsCtx;
    private final GridLogger log;
    private final GridGgfsMarshaller marsh;
    private final Map<String, String> endpointCfg;
    private GridIpcServerEndpoint srvEndpoint;
    private GridGgfsServerHandler hnd;
    private AcceptWorker acceptWorker;
    private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8();
    private final boolean mgmt;

    public GridGgfsServer(GridGgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) {
        assert (ggfsCtx != null);
        assert (endpointCfg != null);
        this.endpointCfg = endpointCfg;
        this.ggfsCtx = ggfsCtx;
        this.mgmt = mgmt;
        this.log = ggfsCtx.kernalContext().log(GridGgfsServer.class);
        this.marsh = new GridGgfsMarshaller();
    }

    public void start() throws GridException {
        this.srvEndpoint = GridIpcServerEndpointDeserializer.deserialize(this.endpointCfg);
        if (U.isWindows() && this.srvEndpoint instanceof GridIpcSharedMemoryServerEndpoint) {
            throw new GridException(GridIpcSharedMemoryServerEndpoint.class.getSimpleName() + " should not be configured on Windows (configure " + GridIpcServerTcpEndpoint.class.getSimpleName() + ")");
        }
        if (this.srvEndpoint instanceof GridIpcServerTcpEndpoint) {
            GridIpcServerTcpEndpoint srvEndpoint0 = (GridIpcServerTcpEndpoint)this.srvEndpoint;
            srvEndpoint0.setManagement(this.mgmt);
            if (srvEndpoint0.getHost() == null) {
                if (this.mgmt) {
                    String locHostName = this.ggfsCtx.kernalContext().config().getLocalHost();
                    try {
                        srvEndpoint0.setHost(U.resolveLocalHost((String)locHostName).getHostAddress());
                    }
                    catch (IOException e) {
                        throw new GridException("Failed to resolve local host: " + locHostName, (Throwable)e);
                    }
                } else {
                    srvEndpoint0.setHost("127.0.0.1");
                }
            }
        }
        this.ggfsCtx.kernalContext().resource().injectGeneric((Object)this.srvEndpoint);
        this.srvEndpoint.start();
        if (this.srvEndpoint.getPort() >= 0) {
            this.ggfsCtx.kernalContext().ports().registerPort(this.srvEndpoint.getPort(), GridPortProtocol.TCP, this.srvEndpoint.getClass());
        }
        this.hnd = new GridGgfsIpcHandler(this.ggfsCtx, this.mgmt);
        this.acceptWorker = new AcceptWorker();
    }

    public void onKernalStart() {
        if (this.srvEndpoint != null) {
            new GridThread((GridWorker)this.acceptWorker).start();
        }
    }

    public void stop(boolean cancel) {
        if (this.srvEndpoint == null) {
            return;
        }
        U.cancel((GridWorker)this.acceptWorker);
        U.join((GridWorker)this.acceptWorker, (GridLogger)this.log);
        try {
            this.hnd.stop();
        }
        catch (GridException e) {
            U.error((GridLogger)this.log, (Object)"Failed to stop GGFS server handler (will close client connections anyway).", (Throwable)e);
        }
        for (ClientWorker worker : this.clientWorkers) {
            U.cancel((GridWorker)worker);
        }
        U.join(this.clientWorkers, (GridLogger)this.log);
        if (this.srvEndpoint.getPort() >= 0) {
            this.ggfsCtx.kernalContext().ports().deregisterPort(this.srvEndpoint.getPort(), GridPortProtocol.TCP, this.srvEndpoint.getClass());
        }
        try {
            this.ggfsCtx.kernalContext().resource().cleanupGeneric((Object)this.srvEndpoint);
        }
        catch (GridException e) {
            U.error((GridLogger)this.log, (Object)"Failed to cleanup server endpoint.", (Throwable)e);
        }
    }

    public GridIpcServerEndpoint getIpcServerEndpoint() {
        return this.srvEndpoint;
    }

    private class AcceptWorker
    extends GridWorker {
        private int acceptCnt;

        protected AcceptWorker() {
            super(GridGgfsServer.this.ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void body() throws InterruptedException, GridInterruptedException {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    GridIpcEndpoint client = GridGgfsServer.this.srvEndpoint.accept();
                    if (log.isDebugEnabled()) {
                        log.debug("GGFS client connected [ggfsName=" + GridGgfsServer.this.ggfsCtx.kernalContext().gridName() + ", client=" + client + ']');
                    }
                    ClientWorker worker = new ClientWorker(client, this.acceptCnt++);
                    GridThread workerThread = new GridThread((GridWorker)worker);
                    ConcurrentLinkedDeque8.Node node = GridGgfsServer.this.clientWorkers.addx((Object)worker);
                    worker.node((ConcurrentLinkedDeque8.Node<ClientWorker>)node);
                    workerThread.start();
                }
            }
            catch (GridException e) {
                if (!this.isCancelled()) {
                    U.error((GridLogger)log, (Object)"Failed to accept client IPC connection (will shutdown accept thread).", (Throwable)e);
                }
            }
            finally {
                GridGgfsServer.this.srvEndpoint.close();
            }
        }

        public void cancel() {
            super.cancel();
            GridGgfsServer.this.srvEndpoint.close();
        }
    }

    private class ClientWorker
    extends GridWorker {
        private GridIpcEndpoint endpoint;
        private final GridGgfsDataOutputStream out;
        private GridGgfsClientSession ses;
        private ConcurrentLinkedDeque8.Node<ClientWorker> node;

        protected ClientWorker(GridIpcEndpoint endpoint, int idx) throws GridException {
            super(GridGgfsServer.this.ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log);
            this.endpoint = endpoint;
            this.ses = new GridGgfsClientSession();
            this.out = new GridGgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void body() throws InterruptedException, GridInterruptedException {
            try {
                GridGgfsDataInputStream dis = new GridGgfsDataInputStream(this.endpoint.inputStream());
                byte[] hdr = new byte[24];
                boolean first = true;
                while (!Thread.currentThread().isInterrupted()) {
                    dis.readFully(hdr);
                    final long reqId = U.bytesToLong((byte[])hdr, (int)0);
                    int ordinal = U.bytesToInt((byte[])hdr, (int)8);
                    if (first) {
                        if (reqId != 0L || ordinal != GridGgfsIpcCommand.HANDSHAKE.ordinal()) {
                            U.warn((GridLogger)log, (Object)"Handshake failed.");
                            return;
                        }
                        first = false;
                    }
                    GridGgfsIpcCommand cmd = GridGgfsIpcCommand.valueOf(ordinal);
                    GridGgfsMessage msg = GridGgfsServer.this.marsh.unmarshall(cmd, hdr, dis);
                    GridFuture<GridGgfsMessage> fut = GridGgfsServer.this.hnd.handleAsync(this.ses, msg, dis);
                    if (fut == null) continue;
                    if (fut.isDone()) {
                        GridGgfsMessage res;
                        try {
                            res = (GridGgfsMessage)fut.get();
                        }
                        catch (GridException e) {
                            res = new GridGgfsControlResponse();
                            ((GridGgfsControlResponse)res).error(e);
                        }
                        try {
                            GridGgfsDataOutputStream e = this.out;
                            synchronized (e) {
                                GridGgfsMarshaller.fillHeader(hdr, reqId, res.command());
                                GridGgfsServer.this.marsh.marshall(res, hdr, this.out);
                                this.out.flush();
                                continue;
                            }
                        }
                        catch (IOException | GridException e) {
                            this.shutdown0(e);
                            continue;
                        }
                    }
                    fut.listenAsync((GridInClosure)new CIX1<GridFuture<GridGgfsMessage>>(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void applyx(GridFuture<GridGgfsMessage> fut) {
                            GridGgfsMessage res;
                            try {
                                res = (GridGgfsMessage)fut.get();
                            }
                            catch (GridException e) {
                                res = new GridGgfsControlResponse();
                                ((GridGgfsControlResponse)res).error(e);
                            }
                            try {
                                GridGgfsDataOutputStream e = ClientWorker.this.out;
                                synchronized (e) {
                                    byte[] hdr = GridGgfsMarshaller.createHeader(reqId, res.command());
                                    GridGgfsServer.this.marsh.marshall(res, hdr, ClientWorker.this.out);
                                    ClientWorker.this.out.flush();
                                }
                            }
                            catch (IOException | GridException e) {
                                ClientWorker.this.shutdown0(e);
                            }
                        }
                    });
                }
            }
            catch (EOFException ignored) {
            }
            catch (IOException | GridException e) {
                if (!this.isCancelled()) {
                    U.error((GridLogger)log, (Object)"Failed to read data from client (will close connection)", (Throwable)e);
                }
            }
            finally {
                this.onFinished();
            }
        }

        public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) {
            this.node = node;
        }

        public void cancel() {
            super.cancel();
            this.shutdown0(null);
        }

        private void shutdown0(@Nullable Throwable e) {
            if (!this.isCancelled() && e != null) {
                U.error((GridLogger)log, (Object)("Stopping client reader due to exception: " + this.endpoint), (Throwable)e);
            }
            U.closeQuiet((Closeable)this.out);
            this.endpoint.close();
        }

        private void onFinished() {
            U.closeQuiet((Closeable)this.out);
            this.endpoint.close();
            if (GridGgfsServer.this.clientWorkers.unlinkx(this.node)) {
                GridGgfsServer.this.hnd.onClosed(this.ses);
            }
        }
    }
}

