/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.ggfs.hadoop;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.gridgain.grid.GridException;
import org.gridgain.grid.ggfs.GridGgfsBlockLocation;
import org.gridgain.grid.ggfs.GridGgfsFile;
import org.gridgain.grid.ggfs.GridGgfsPath;
import org.gridgain.grid.ggfs.GridGgfsPathSummary;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsControlResponse;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsHandshakeRequest;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsIpcCommand;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsMessage;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsPathControlRequest;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsStatusRequest;
import org.gridgain.grid.kernal.ggfs.common.GridGgfsStreamControlRequest;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopEx;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopIpcIo;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopIpcIoListener;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopStreamDelegate;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopStreamEventListener;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopUtils;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsHandshakeResponse;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsInputStreamDescriptor;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsStatus;
import org.gridgain.grid.util.lang.GridPlainClosure;
import org.gridgain.grid.util.lang.GridPlainFuture;
import org.gridgain.grid.util.lang.GridPlainFutureAdapter;
import org.jdk8.backport.ConcurrentHashMap8;
import org.jetbrains.annotations.Nullable;

public class GridGgfsHadoopOutProc
implements GridGgfsHadoopEx,
GridGgfsHadoopIpcIoListener {
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, Boolean> BOOL_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, Long> LONG_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, GridGgfsFile> FILE_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, GridGgfsHandshakeResponse> HANDSHAKE_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, GridGgfsStatus> STATUS_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, GridGgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, Collection<GridGgfsFile>> FILE_COL_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, Collection<GridGgfsPath>> PATH_COL_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, GridGgfsPathSummary> SUMMARY_RES = GridGgfsHadoopOutProc.createClosure();
    private static final GridPlainClosure<GridPlainFuture<GridGgfsMessage>, Collection<GridGgfsBlockLocation>> BLOCK_LOCATION_COL_RES = GridGgfsHadoopOutProc.createClosure();
    private final String grid;
    private final String ggfs;
    private final Log log;
    private final GridGgfsHadoopIpcIo io;
    private final Map<Long, GridGgfsHadoopStreamEventListener> lsnrs = new ConcurrentHashMap8();

    public GridGgfsHadoopOutProc(String host, int port, String grid, String ggfs, Log log) throws IOException {
        this(host, port, grid, ggfs, false, log);
    }

    public GridGgfsHadoopOutProc(int port, String grid, String ggfs, Log log) throws IOException {
        this(null, port, grid, ggfs, true, log);
    }

    private GridGgfsHadoopOutProc(String host, int port, String grid, String ggfs, boolean shmem, Log log) throws IOException {
        assert (host != null && !shmem || host == null && shmem) : "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
        String endpoint = host != null ? host + ":" + port : "shmem:" + port;
        this.grid = grid;
        this.ggfs = ggfs;
        this.log = log;
        this.io = GridGgfsHadoopIpcIo.get(log, endpoint);
        this.io.addEventListener(this);
    }

    @Override
    public GridGgfsHandshakeResponse handshake(String logDir) throws GridException {
        GridGgfsHandshakeRequest req = new GridGgfsHandshakeRequest();
        req.gridName(this.grid);
        req.ggfsName(this.ggfs);
        req.logDirectory(logDir);
        return (GridGgfsHandshakeResponse)this.io.send(req).chain(HANDSHAKE_RES).get();
    }

    @Override
    public void close(boolean force) {
        assert (this.io != null);
        this.io.removeEventListener(this);
        if (force) {
            this.io.forceClose();
        } else {
            this.io.release();
        }
    }

    @Override
    public GridGgfsFile info(GridGgfsPath path) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.INFO);
        msg.path(path);
        return (GridGgfsFile)this.io.send(msg).chain(FILE_RES).get();
    }

    @Override
    public GridGgfsFile update(GridGgfsPath path, Map<String, String> props) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.UPDATE);
        msg.path(path);
        msg.properties(props);
        return (GridGgfsFile)this.io.send(msg).chain(FILE_RES).get();
    }

    @Override
    public Boolean setTimes(GridGgfsPath path, long accessTime, long modificationTime) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.SET_TIMES);
        msg.path(path);
        msg.accessTime(accessTime);
        msg.modificationTime(modificationTime);
        return (Boolean)this.io.send(msg).chain(BOOL_RES).get();
    }

    @Override
    public Boolean rename(GridGgfsPath src, GridGgfsPath dest) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.RENAME);
        msg.path(src);
        msg.destinationPath(dest);
        return (Boolean)this.io.send(msg).chain(BOOL_RES).get();
    }

    @Override
    public Boolean delete(GridGgfsPath path, boolean recursive) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.DELETE);
        msg.path(path);
        msg.flag(recursive);
        return (Boolean)this.io.send(msg).chain(BOOL_RES).get();
    }

    @Override
    public Collection<GridGgfsBlockLocation> affinity(GridGgfsPath path, long start, long len) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.AFFINITY);
        msg.path(path);
        msg.start(start);
        msg.length(len);
        return (Collection)this.io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
    }

    @Override
    public GridGgfsPathSummary contentSummary(GridGgfsPath path) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.PATH_SUMMARY);
        msg.path(path);
        return (GridGgfsPathSummary)this.io.send(msg).chain(SUMMARY_RES).get();
    }

    @Override
    public Boolean mkdirs(GridGgfsPath path, Map<String, String> props) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.MAKE_DIRECTORIES);
        msg.path(path);
        msg.properties(props);
        return (Boolean)this.io.send(msg).chain(BOOL_RES).get();
    }

    @Override
    public Collection<GridGgfsFile> listFiles(GridGgfsPath path) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.LIST_FILES);
        msg.path(path);
        return (Collection)this.io.send(msg).chain(FILE_COL_RES).get();
    }

    @Override
    public Collection<GridGgfsPath> listPaths(GridGgfsPath path) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.LIST_PATHS);
        msg.path(path);
        return (Collection)this.io.send(msg).chain(PATH_COL_RES).get();
    }

    @Override
    public GridGgfsStatus fsStatus() throws GridException {
        return (GridGgfsStatus)this.io.send(new GridGgfsStatusRequest()).chain(STATUS_RES).get();
    }

    @Override
    public GridGgfsHadoopStreamDelegate open(GridGgfsPath path) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.OPEN_READ);
        msg.path(path);
        msg.flag(false);
        GridGgfsInputStreamDescriptor rmtDesc = (GridGgfsInputStreamDescriptor)this.io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
        return new GridGgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
    }

    @Override
    public GridGgfsHadoopStreamDelegate open(GridGgfsPath path, int seqReadsBeforePrefetch) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.OPEN_READ);
        msg.path(path);
        msg.flag(true);
        msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
        GridGgfsInputStreamDescriptor rmtDesc = (GridGgfsInputStreamDescriptor)this.io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
        return new GridGgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
    }

    @Override
    public GridGgfsHadoopStreamDelegate create(GridGgfsPath path, boolean overwrite, boolean colocate, int replication, long blockSize, @Nullable Map<String, String> props) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.OPEN_CREATE);
        msg.path(path);
        msg.flag(overwrite);
        msg.colocate(colocate);
        msg.properties(props);
        msg.replication(replication);
        msg.blockSize(blockSize);
        Long streamId = (Long)this.io.send(msg).chain(LONG_RES).get();
        return new GridGgfsHadoopStreamDelegate(this, streamId);
    }

    @Override
    public GridGgfsHadoopStreamDelegate append(GridGgfsPath path, boolean create, @Nullable Map<String, String> props) throws GridException {
        GridGgfsPathControlRequest msg = new GridGgfsPathControlRequest();
        msg.command(GridGgfsIpcCommand.OPEN_APPEND);
        msg.path(path);
        msg.flag(create);
        msg.properties(props);
        Long streamId = (Long)this.io.send(msg).chain(LONG_RES).get();
        return new GridGgfsHadoopStreamDelegate(this, streamId);
    }

    @Override
    public GridPlainFuture<byte[]> readData(GridGgfsHadoopStreamDelegate desc, long pos, int len, @Nullable byte[] outBuf, int outOff, int outLen) {
        assert (len > 0);
        GridGgfsStreamControlRequest msg = new GridGgfsStreamControlRequest();
        msg.command(GridGgfsIpcCommand.READ_BLOCK);
        msg.streamId((Long)desc.target());
        msg.position(pos);
        msg.length(len);
        try {
            return this.io.send(msg, outBuf, outOff, outLen);
        }
        catch (GridException e) {
            return new GridPlainFutureAdapter((Throwable)e);
        }
    }

    @Override
    public void writeData(GridGgfsHadoopStreamDelegate desc, byte[] data, int off, int len) throws IOException {
        GridGgfsStreamControlRequest msg = new GridGgfsStreamControlRequest();
        msg.command(GridGgfsIpcCommand.WRITE_BLOCK);
        msg.streamId((Long)desc.target());
        msg.data(data);
        msg.position(off);
        msg.length(len);
        try {
            this.io.sendPlain(msg);
        }
        catch (GridException e) {
            throw GridGgfsHadoopUtils.cast(e);
        }
    }

    @Override
    public void flush(GridGgfsHadoopStreamDelegate delegate) throws IOException {
    }

    @Override
    public void closeStream(GridGgfsHadoopStreamDelegate desc) throws IOException {
        GridGgfsStreamControlRequest msg = new GridGgfsStreamControlRequest();
        msg.command(GridGgfsIpcCommand.CLOSE);
        msg.streamId((Long)desc.target());
        try {
            this.io.send(msg).chain(BOOL_RES).get();
        }
        catch (GridException e) {
            throw GridGgfsHadoopUtils.cast(e);
        }
    }

    @Override
    public void addEventListener(GridGgfsHadoopStreamDelegate desc, GridGgfsHadoopStreamEventListener lsnr) {
        long streamId = (Long)desc.target();
        GridGgfsHadoopStreamEventListener lsnr0 = this.lsnrs.put(streamId, lsnr);
        assert (lsnr0 == null || lsnr0 == lsnr);
        if (this.log.isDebugEnabled()) {
            this.log.debug((Object)("Added stream event listener [streamId=" + streamId + ']'));
        }
    }

    @Override
    public void removeEventListener(GridGgfsHadoopStreamDelegate desc) {
        long streamId = (Long)desc.target();
        GridGgfsHadoopStreamEventListener lsnr0 = this.lsnrs.remove(streamId);
        if (lsnr0 != null && this.log.isDebugEnabled()) {
            this.log.debug((Object)("Removed stream event listener [streamId=" + streamId + ']'));
        }
    }

    @Override
    public void onClose() {
        for (GridGgfsHadoopStreamEventListener lsnr : this.lsnrs.values()) {
            try {
                lsnr.onClose();
            }
            catch (GridException e) {
                this.log.warn((Object)("Got exception from stream event listener (will ignore): " + lsnr), (Throwable)e);
            }
        }
    }

    @Override
    public void onError(long streamId, String errMsg) {
        GridGgfsHadoopStreamEventListener lsnr = this.lsnrs.get(streamId);
        if (lsnr != null) {
            lsnr.onError(errMsg);
        } else {
            this.log.warn((Object)("Received write error response for not registered output stream (will ignore) [streamId= " + streamId + ']'));
        }
    }

    private static <T> GridPlainClosure<GridPlainFuture<GridGgfsMessage>, T> createClosure() {
        return new GridPlainClosure<GridPlainFuture<GridGgfsMessage>, T>(){

            public T apply(GridPlainFuture<GridGgfsMessage> fut) throws GridException {
                GridGgfsControlResponse res = (GridGgfsControlResponse)fut.get();
                if (res.hasError()) {
                    res.throwError();
                }
                return res.response();
            }
        };
    }
}

