/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.kernal.ggfs.hadoop;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.gridgain.grid.Grid;
import org.gridgain.grid.GridException;
import org.gridgain.grid.ggfs.GridGgfsBlockLocation;
import org.gridgain.grid.ggfs.GridGgfsFile;
import org.gridgain.grid.ggfs.GridGgfsPath;
import org.gridgain.grid.ggfs.GridGgfsPathSummary;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoop;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopCommunicationException;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopEndpoint;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopEx;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopInProc;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopOutProc;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopStreamDelegate;
import org.gridgain.grid.kernal.ggfs.hadoop.GridGgfsHadoopUtils;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsEx;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsHandshakeResponse;
import org.gridgain.grid.kernal.processors.ggfs.GridGgfsStatus;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.G;
import org.gridgain.grid.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

public class GridGgfsHadoopWrapper
implements GridGgfsHadoop {
    private final AtomicReference<Delegate> delegateRef = new AtomicReference();
    private final String authority;
    private final GridGgfsHadoopEndpoint endpoint;
    private final String logDir;
    private final Configuration conf;
    private final Log log;

    public GridGgfsHadoopWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
        try {
            this.authority = authority;
            this.endpoint = new GridGgfsHadoopEndpoint(authority);
            this.logDir = logDir;
            this.conf = conf;
            this.log = log;
        }
        catch (GridException e) {
            throw new IOException("Failed to parse endpoint: " + authority, e);
        }
    }

    @Override
    public GridGgfsHandshakeResponse handshake(String logDir) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsHandshakeResponse>(){

            @Override
            public GridGgfsHandshakeResponse apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hndResp;
            }
        });
    }

    @Override
    public void close(boolean force) {
        Delegate delegate = this.delegateRef.get();
        if (delegate != null && this.delegateRef.compareAndSet(delegate, null)) {
            delegate.close(force);
        }
    }

    @Override
    public GridGgfsFile info(final GridGgfsPath path) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsFile>(){

            @Override
            public GridGgfsFile apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.info(path);
            }
        }, path);
    }

    @Override
    public GridGgfsFile update(final GridGgfsPath path, final Map<String, String> props) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsFile>(){

            @Override
            public GridGgfsFile apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.update(path, props);
            }
        }, path);
    }

    @Override
    public Boolean setTimes(final GridGgfsPath path, final long accessTime, final long modificationTime) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Boolean>(){

            @Override
            public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.setTimes(path, accessTime, modificationTime);
            }
        }, path);
    }

    @Override
    public Boolean rename(final GridGgfsPath src, final GridGgfsPath dest) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Boolean>(){

            @Override
            public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.rename(src, dest);
            }
        }, src);
    }

    @Override
    public Boolean delete(final GridGgfsPath path, final boolean recursive) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Boolean>(){

            @Override
            public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.delete(path, recursive);
            }
        }, path);
    }

    @Override
    public Collection<GridGgfsBlockLocation> affinity(final GridGgfsPath path, final long start, final long len) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Collection<GridGgfsBlockLocation>>(){

            @Override
            public Collection<GridGgfsBlockLocation> apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.affinity(path, start, len);
            }
        }, path);
    }

    @Override
    public GridGgfsPathSummary contentSummary(final GridGgfsPath path) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsPathSummary>(){

            @Override
            public GridGgfsPathSummary apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.contentSummary(path);
            }
        }, path);
    }

    @Override
    public Boolean mkdirs(final GridGgfsPath path, final Map<String, String> props) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Boolean>(){

            @Override
            public Boolean apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.mkdirs(path, props);
            }
        }, path);
    }

    @Override
    public Collection<GridGgfsFile> listFiles(final GridGgfsPath path) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Collection<GridGgfsFile>>(){

            @Override
            public Collection<GridGgfsFile> apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.listFiles(path);
            }
        }, path);
    }

    @Override
    public Collection<GridGgfsPath> listPaths(final GridGgfsPath path) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<Collection<GridGgfsPath>>(){

            @Override
            public Collection<GridGgfsPath> apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.listPaths(path);
            }
        }, path);
    }

    @Override
    public GridGgfsStatus fsStatus() throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsStatus>(){

            @Override
            public GridGgfsStatus apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.fsStatus();
            }
        });
    }

    @Override
    public GridGgfsHadoopStreamDelegate open(final GridGgfsPath path) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>(){

            @Override
            public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.open(path);
            }
        }, path);
    }

    @Override
    public GridGgfsHadoopStreamDelegate open(final GridGgfsPath path, final int seqReadsBeforePrefetch) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>(){

            @Override
            public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.open(path, seqReadsBeforePrefetch);
            }
        }, path);
    }

    @Override
    public GridGgfsHadoopStreamDelegate create(final GridGgfsPath path, final boolean overwrite, final boolean colocate, final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>(){

            @Override
            public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
            }
        }, path);
    }

    @Override
    public GridGgfsHadoopStreamDelegate append(final GridGgfsPath path, final boolean create, final @Nullable Map<String, String> props) throws IOException {
        return this.withReconnectHandling(new FileSystemClosure<GridGgfsHadoopStreamDelegate>(){

            @Override
            public GridGgfsHadoopStreamDelegate apply(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) throws GridException, IOException {
                return hadoop.append(path, create, props);
            }
        }, path);
    }

    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
        return this.withReconnectHandling(clo, null);
    }

    private <T> T withReconnectHandling(FileSystemClosure<T> clo, @Nullable GridGgfsPath path) throws IOException {
        GridGgfsHadoopCommunicationException err = null;
        for (int i = 0; i < 2; ++i) {
            Delegate curDelegate = null;
            boolean close = false;
            boolean force = false;
            try {
                curDelegate = this.delegate();
                assert (curDelegate != null);
                close = curDelegate.doomed;
                T t = clo.apply(curDelegate.hadoop, curDelegate.hndResp);
                return t;
            }
            catch (GridGgfsHadoopCommunicationException e) {
                if (curDelegate != null && !curDelegate.doomed) {
                    this.delegateRef.compareAndSet(curDelegate, null);
                    close = true;
                    force = true;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("Failed to send message to a server: " + (Object)((Object)e)));
                }
                err = e;
                continue;
            }
            catch (GridException e) {
                throw GridGgfsHadoopUtils.cast(e, path != null ? path.toString() : null);
            }
            finally {
                if (close) {
                    assert (curDelegate != null);
                    curDelegate.close(force);
                }
            }
        }
        throw new IOException("Failed to communicate with GGFS.", (Throwable)((Object)err));
    }

    private Delegate delegate() throws GridGgfsHadoopCommunicationException {
        boolean skipLocalTcp;
        GridGgfsHadoopEx hadoop;
        Throwable err = null;
        Delegate curDelegate = this.delegateRef.get();
        if (curDelegate != null) {
            return curDelegate;
        }
        if (!GridGgfsHadoopUtils.parameter(this.conf, "fs.ggfs.%s.endpoint.no_embed", this.authority, false)) {
            GridGgfsEx ggfs = null;
            if (this.endpoint.grid() == null) {
                try {
                    Grid grid = G.grid();
                    ggfs = (GridGgfsEx)grid.ggfs(this.endpoint.ggfs());
                }
                catch (Exception e) {
                    err = e;
                }
            } else {
                for (Grid grid : G.allGrids()) {
                    try {
                        ggfs = (GridGgfsEx)grid.ggfs(this.endpoint.ggfs());
                        break;
                    }
                    catch (Exception e) {
                        err = e;
                    }
                }
            }
            if (ggfs != null) {
                hadoop = null;
                try {
                    hadoop = new GridGgfsHadoopInProc(ggfs, this.log);
                    curDelegate = new Delegate(hadoop, hadoop.handshake(this.logDir));
                }
                catch (IOException | GridException e) {
                    if (e instanceof GridGgfsHadoopCommunicationException) {
                        hadoop.close(true);
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug((Object)"Failed to connect to in-proc GGFS, fallback to IPC mode.", e);
                    }
                    err = e;
                }
            }
        }
        if (!GridGgfsHadoopUtils.parameter(this.conf, "fs.ggfs.%s.endpoint.no_local_shmem", this.authority, false) && curDelegate == null && !U.isWindows()) {
            GridGgfsHadoop hadoop2 = null;
            try {
                hadoop2 = new GridGgfsHadoopOutProc(this.endpoint.port(), this.endpoint.grid(), this.endpoint.ggfs(), this.log);
                curDelegate = new Delegate((GridGgfsHadoopEx)hadoop2, hadoop2.handshake(this.logDir));
            }
            catch (IOException | GridException e) {
                if (e instanceof GridGgfsHadoopCommunicationException) {
                    hadoop2.close(true);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)"Failed to connect to out-proc local GGFS using shmem.", e);
                }
                err = e;
            }
        }
        if (!(skipLocalTcp = GridGgfsHadoopUtils.parameter(this.conf, "fs.ggfs.%s.endpoint.no_local_tcp", this.authority, false)) && curDelegate == null) {
            hadoop = null;
            try {
                hadoop = new GridGgfsHadoopOutProc("127.0.0.1", this.endpoint.port(), this.endpoint.grid(), this.endpoint.ggfs(), this.log);
                curDelegate = new Delegate(hadoop, hadoop.handshake(this.logDir));
            }
            catch (IOException | GridException e) {
                if (e instanceof GridGgfsHadoopCommunicationException) {
                    hadoop.close(true);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)"Failed to connect to out-proc local GGFS using TCP.", e);
                }
                err = e;
            }
        }
        if (curDelegate == null && (skipLocalTcp || !F.eq((Object)"127.0.0.1", (Object)this.endpoint.host()))) {
            hadoop = null;
            try {
                hadoop = new GridGgfsHadoopOutProc("127.0.0.1", this.endpoint.port(), this.endpoint.grid(), this.endpoint.ggfs(), this.log);
                curDelegate = new Delegate(hadoop, hadoop.handshake(this.logDir));
            }
            catch (IOException | GridException e) {
                if (e instanceof GridGgfsHadoopCommunicationException) {
                    hadoop.close(true);
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)"Failed to connect to out-proc remote GGFS using TCP.", e);
                }
                err = e;
            }
        }
        if (curDelegate != null) {
            if (!this.delegateRef.compareAndSet(null, curDelegate)) {
                curDelegate.doomed = true;
            }
            return curDelegate;
        }
        throw new GridGgfsHadoopCommunicationException("Failed to connect to GGFS: " + this.endpoint, (Exception)err);
    }

    private static class Delegate {
        private final GridGgfsHadoopEx hadoop;
        private final GridGgfsHandshakeResponse hndResp;
        private final AtomicBoolean closeGuard = new AtomicBoolean();
        private boolean doomed;

        private Delegate(GridGgfsHadoopEx hadoop, GridGgfsHandshakeResponse hndResp) {
            this.hadoop = hadoop;
            this.hndResp = hndResp;
        }

        private void close(boolean force) {
            if (this.closeGuard.compareAndSet(false, true)) {
                this.hadoop.close(force);
            }
        }
    }

    private static interface FileSystemClosure<T> {
        public T apply(GridGgfsHadoopEx var1, GridGgfsHandshakeResponse var2) throws GridException, IOException;
    }
}

