/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.processors.ggfs;

import java.io.Closeable;
import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.GridUuid;
import org.gridgain.grid.ggfs.GridGgfsOutOfSpaceException;
import org.gridgain.grid.ggfs.GridGgfsOutputStream;
import org.gridgain.grid.kernal.GridKernalContext;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsControlResponse;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsHandshakeRequest;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsIpcCommand;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsMessage;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsPathControlRequest;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsStreamControlRequest;
import org.gridgain.grid.kernal.processors.closure.GridClosurePolicy;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsClientSession;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsContext;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsEx;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsFileInfo;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsHandshakeResponse;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsInputStreamAdapter;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsInputStreamDescriptor;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsServerHandler;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsStatus;
import org.gridgain.grid.kernal.processors.license.GridLicenseUseRegistry;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.product.GridProductEdition;
import org.gridgain.grid.util.future.GridFinishedFuture;
import org.gridgain.grid.util.lang.GridPlainCallable;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.X;
import org.gridgain.grid.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

class GridGgfsIpcHandler
implements GridGgfsServerHandler {
    private static boolean errWrite;
    private final GridKernalContext ctx;
    private GridLogger log;
    private final int bufSize;
    private GridGgfsEx ggfs;
    private AtomicLong rsrcIdGen = new AtomicLong();
    private volatile boolean stopping;
    private final boolean mgmt;

    GridGgfsIpcHandler(GridGgfsContext ggfsCtx, boolean mgmt) {
        assert (ggfsCtx != null);
        this.mgmt = mgmt;
        this.ctx = ggfsCtx.kernalContext();
        this.ggfs = ggfsCtx.ggfs();
        this.bufSize = ggfsCtx.configuration().getBlockSize() * 2;
        this.log = this.ctx.log(GridGgfsIpcHandler.class);
    }

    @Override
    public void stop() throws GridException {
        this.stopping = true;
    }

    @Override
    public void onClosed(GridGgfsClientSession ses) {
        Iterator<Closeable> it = ses.registeredResources();
        while (it.hasNext()) {
            Closeable stream = it.next();
            try {
                stream.close();
            }
            catch (IOException e) {
                U.warn((GridLogger)this.log, (Object)("Failed to close opened stream on client close event (will continue) [ses=" + ses + ", stream=" + stream + ']'), (Object)e);
            }
        }
    }

    @Override
    public GridFuture<GridGgfsMessage> handleAsync(final GridGgfsClientSession ses, final GridGgfsMessage msg, DataInput in) {
        if (!this.mgmt) {
            GridLicenseUseRegistry.onUsage((GridProductEdition)GridProductEdition.HADOOP, this.getClass());
        }
        try {
            GridFinishedFuture fut;
            if (this.stopping) {
                return null;
            }
            final GridGgfsIpcCommand cmd = msg.command();
            switch (cmd) {
                case WRITE_BLOCK: 
                case MAKE_DIRECTORIES: 
                case LIST_FILES: 
                case LIST_PATHS: {
                    GridGgfsMessage res = this.execute(ses, cmd, msg, in);
                    fut = res == null ? null : new GridFinishedFuture(this.ctx, (Object)res);
                    break;
                }
                default: {
                    fut = this.ctx.closure().callLocalSafe((Callable)new GridPlainCallable<GridGgfsMessage>(){

                        public GridGgfsMessage call() throws Exception {
                            return GridGgfsIpcHandler.this.execute(ses, cmd, msg, null);
                        }
                    }, GridClosurePolicy.GGFS_POOL);
                }
            }
            return fut;
        }
        catch (Exception e) {
            return new GridFinishedFuture(this.ctx, (Throwable)e);
        }
    }

    private GridGgfsMessage execute(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg, @Nullable DataInput in) throws Exception {
        switch (cmd) {
            case HANDSHAKE: {
                return this.processHandshakeRequest((GridGgfsHandshakeRequest)msg);
            }
            case STATUS: {
                return this.processStatusRequest();
            }
            case MAKE_DIRECTORIES: 
            case LIST_FILES: 
            case LIST_PATHS: 
            case EXISTS: 
            case INFO: 
            case PATH_SUMMARY: 
            case UPDATE: 
            case RENAME: 
            case DELETE: 
            case SET_TIMES: 
            case AFFINITY: 
            case OPEN_READ: 
            case OPEN_CREATE: 
            case OPEN_APPEND: {
                return this.processPathControlRequest(ses, cmd, msg);
            }
            case WRITE_BLOCK: 
            case CLOSE: 
            case READ_BLOCK: {
                return this.processStreamControlRequest(ses, cmd, msg, in);
            }
        }
        throw new GridException("Unsupported IPC command: " + (Object)((Object)cmd));
    }

    private GridGgfsMessage processHandshakeRequest(GridGgfsHandshakeRequest req) throws GridException {
        if (!F.eq((Object)this.ctx.gridName(), (Object)req.gridName())) {
            throw new GridException("Failed to perform handshake because actual Grid name differs from expected [expected=" + req.gridName() + ", actual=" + this.ctx.gridName() + ']');
        }
        if (!F.eq((Object)this.ggfs.name(), (Object)req.ggfsName())) {
            throw new GridException("Failed to perform handshake because actual GGFS name differs from expected [expected=" + req.ggfsName() + ", actual=" + this.ggfs.name() + ']');
        }
        GridGgfsControlResponse res = new GridGgfsControlResponse();
        this.ggfs.clientLogDirectory(req.logDirectory());
        GridGgfsHandshakeResponse handshake = new GridGgfsHandshakeResponse(this.ggfs.name(), this.ggfs.proxyPaths(), this.ggfs.groupBlockSize(), this.ggfs.globalSampling());
        res.handshake(handshake);
        return res;
    }

    private GridGgfsMessage processStatusRequest() throws GridException {
        GridGgfsStatus status = this.ggfs.globalSpace();
        GridGgfsControlResponse res = new GridGgfsControlResponse();
        res.status(status);
        return res;
    }

    private GridGgfsMessage processPathControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg) throws GridException {
        GridGgfsPathControlRequest req = (GridGgfsPathControlRequest)msg;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing path control request [ggfsName=" + this.ggfs.name() + ", req=" + req + ']');
        }
        GridGgfsControlResponse res = new GridGgfsControlResponse();
        switch (cmd) {
            case EXISTS: {
                res.response(this.ggfs.exists(req.path()));
                break;
            }
            case INFO: {
                res.response(this.ggfs.info(req.path()));
                break;
            }
            case PATH_SUMMARY: {
                res.response(this.ggfs.summary(req.path()));
                break;
            }
            case UPDATE: {
                res.response(this.ggfs.update(req.path(), req.properties()));
                break;
            }
            case RENAME: {
                this.ggfs.rename(req.path(), req.destinationPath());
                res.response(true);
                break;
            }
            case DELETE: {
                res.response(this.ggfs.delete(req.path(), req.flag()));
                break;
            }
            case MAKE_DIRECTORIES: {
                this.ggfs.mkdirs(req.path(), req.properties());
                res.response(true);
                break;
            }
            case LIST_PATHS: {
                res.paths(this.ggfs.listPaths(req.path()));
                break;
            }
            case LIST_FILES: {
                res.files(this.ggfs.listFiles(req.path()));
                break;
            }
            case SET_TIMES: {
                this.ggfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
                res.response(true);
                break;
            }
            case AFFINITY: {
                res.locations(this.ggfs.affinity(req.path(), req.start(), req.length()));
                break;
            }
            case OPEN_READ: {
                GridGgfsInputStreamAdapter ggfsIn = !req.flag() ? this.ggfs.open(req.path(), this.bufSize) : this.ggfs.open(req.path(), this.bufSize, req.sequentialReadsBeforePrefetch());
                long streamId = this.registerResource(ses, (Closeable)((Object)ggfsIn));
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Opened GGFS input stream for file read [ggfsName=" + this.ggfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                }
                GridGgfsFileInfo info = new GridGgfsFileInfo(ggfsIn.fileInfo(), null, ggfsIn.fileInfo().modificationTime());
                res.response(new GridGgfsInputStreamDescriptor(streamId, info.length()));
                break;
            }
            case OPEN_CREATE: {
                long streamId = this.registerResource(ses, (Closeable)this.ggfs.create(req.path(), this.bufSize, req.flag(), this.affinityKey(req), req.replication(), req.blockSize(), req.properties()));
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Opened GGFS output stream for file create [ggfsName=" + this.ggfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                }
                res.response(streamId);
                break;
            }
            case OPEN_APPEND: {
                long streamId = this.registerResource(ses, (Closeable)this.ggfs.append(req.path(), this.bufSize, req.flag(), req.properties()));
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Opened GGFS output stream for file append [ggfsName=" + this.ggfs.name() + ", path=" + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
                }
                res.response(streamId);
                break;
            }
            default: {
                assert (false) : "Unhandled path control request command: " + (Object)((Object)cmd);
                break;
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished processing path control request [ggfsName=" + this.ggfs.name() + ", req=" + req + ", res=" + res + ']');
        }
        return res;
    }

    private GridGgfsMessage processStreamControlRequest(GridGgfsClientSession ses, GridGgfsIpcCommand cmd, GridGgfsMessage msg, DataInput in) throws GridException, IOException {
        GridGgfsStreamControlRequest req = (GridGgfsStreamControlRequest)msg;
        Long rsrcId = req.streamId();
        GridGgfsControlResponse resp = new GridGgfsControlResponse();
        switch (cmd) {
            case CLOSE: {
                Closeable res = this.resource(ses, rsrcId);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Requested to close resource [ggfsName=" + this.ggfs.name() + ", rsrcId=" + rsrcId + ", res=" + res + ']');
                }
                if (res == null) {
                    throw new GridException("Resource to close not found: " + rsrcId);
                }
                try {
                    res.close();
                }
                catch (IOException e) {
                    GridGgfsOutOfSpaceException space = (GridGgfsOutOfSpaceException)X.cause((Throwable)e, GridGgfsOutOfSpaceException.class);
                    if (space != null) {
                        throw space;
                    }
                    throw e;
                }
                boolean success = ses.unregisterResource(rsrcId, res);
                assert (success) : "Failed to unregister resource [ggfsName=" + this.ggfs.name() + ", rsrcId=" + rsrcId + ", res=" + res + ']';
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Closed GGFS stream [ggfsName=" + this.ggfs.name() + ", streamId=" + rsrcId + ", ses=" + ses + ']');
                }
                resp.response(true);
                break;
            }
            case READ_BLOCK: {
                long pos = req.position();
                int size = req.length();
                GridGgfsInputStreamAdapter ggfsIn = (GridGgfsInputStreamAdapter)((Object)this.resource(ses, rsrcId));
                if (ggfsIn == null) {
                    throw new GridException("Input stream not found (already closed?): " + rsrcId);
                }
                byte[][] chunks = ggfsIn.readChunks(pos, size);
                resp.response(chunks);
                int len = 0;
                if (chunks.length > 0) {
                    len += chunks[0].length;
                }
                if (chunks.length > 1) {
                    len += chunks[chunks.length - 1].length;
                }
                if (chunks.length > 2) {
                    len += chunks[1].length * (chunks.length - 2);
                }
                resp.length(len);
                break;
            }
            case WRITE_BLOCK: {
                assert (rsrcId != null) : "Missing stream ID";
                GridGgfsOutputStream out = (GridGgfsOutputStream)this.resource(ses, rsrcId);
                if (out == null) {
                    throw new GridException("Output stream not found (already closed?): " + rsrcId);
                }
                int writeLen = req.length();
                try {
                    out.transferFrom(in, writeLen);
                    if (errWrite) {
                        throw new IOException("Failed to write data to server (test).");
                    }
                    return null;
                }
                catch (IOException e) {
                    resp.error(rsrcId, e.getMessage());
                    break;
                }
            }
            default: {
                assert (false);
                break;
            }
        }
        return resp;
    }

    @Nullable
    private GridUuid affinityKey(GridGgfsPathControlRequest req) {
        if (!req.colocate()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Will not generate affinity key for path control request [ggfsName=" + this.ggfs.name() + ", req=" + req + ']');
            }
            return null;
        }
        GridUuid key = this.ggfs.nextAffinityKey();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Generated affinity key for path control request [ggfsName=" + this.ggfs.name() + ", req=" + req + ", key=" + key + ']');
        }
        return key;
    }

    private long registerResource(GridGgfsClientSession ses, Closeable rsrc) {
        long rsrcId = this.rsrcIdGen.getAndIncrement();
        boolean registered = ses.registerResource(rsrcId, rsrc);
        assert (registered) : "Failed to register resource (duplicate id?): " + rsrcId;
        return rsrcId;
    }

    @Nullable
    private Closeable resource(GridGgfsClientSession ses, Long rsrcId) {
        return (Closeable)ses.resource(rsrcId);
    }
}

