/*
 * Decompiled with CFR 0.152.
 */
package consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.shortcircuit;

import consulting.freiheitsgrade.patched.dependencies.com.google.common.annotations.VisibleForTesting;
import consulting.freiheitsgrade.patched.dependencies.com.google.common.base.Preconditions;
import consulting.freiheitsgrade.patched.dependencies.org.apache.commons.lang3.mutable.MutableBoolean;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.classification.InterfaceAudience;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.ExtendedBlockId;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.net.DomainPeer;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.shortcircuit.DfsClientShm;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.net.unix.DomainSocket;
import consulting.freiheitsgrade.patched.dependencies.org.apache.hadoop.net.unix.DomainSocketWatcher;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class DfsClientShmManager
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(DfsClientShmManager.class);
    private boolean closed = false;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition finishedLoading = this.lock.newCondition();
    private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = new HashMap(1);
    private final DomainSocketWatcher domainSocketWatcher;

    DfsClientShmManager(int interruptCheckPeriodMs) throws IOException {
        this.domainSocketWatcher = new DomainSocketWatcher(interruptCheckPeriodMs, "client");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ShortCircuitShm.Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, MutableBoolean usedPeer, ExtendedBlockId blockId, String clientName) throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                LOG.trace(this + ": the DfsClientShmManager isclosed.");
                ShortCircuitShm.Slot slot = null;
                return slot;
            }
            EndpointShmManager shmManager = this.datanodes.get(datanode);
            if (shmManager == null) {
                shmManager = new EndpointShmManager(datanode);
                this.datanodes.put(datanode, shmManager);
            }
            ShortCircuitShm.Slot slot = shmManager.allocSlot(peer, usedPeer, clientName, blockId);
            return slot;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void freeSlot(ShortCircuitShm.Slot slot) {
        this.lock.lock();
        try {
            DfsClientShm shm = (DfsClientShm)slot.getShm();
            shm.getEndpointShmManager().freeSlot(slot);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void visit(Visitor visitor) throws IOException {
        this.lock.lock();
        try {
            HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info = new HashMap<DatanodeInfo, PerDatanodeVisitorInfo>();
            for (Map.Entry<DatanodeInfo, EndpointShmManager> entry : this.datanodes.entrySet()) {
                info.put(entry.getKey(), entry.getValue().getVisitorInfo());
            }
            visitor.visit(info);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        finally {
            this.lock.unlock();
        }
        try {
            this.domainSocketWatcher.close();
        }
        catch (Throwable e) {
            LOG.debug("Exception in closing " + this.domainSocketWatcher, e);
        }
    }

    public String toString() {
        return String.format("ShortCircuitShmManager(%08x)", System.identityHashCode(this));
    }

    @VisibleForTesting
    public DomainSocketWatcher getDomainSocketWatcher() {
        return this.domainSocketWatcher;
    }

    @VisibleForTesting
    public static interface Visitor {
        public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> var1) throws IOException;
    }

    @VisibleForTesting
    public static class PerDatanodeVisitorInfo {
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full;
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull;
        public final boolean disabled;

        PerDatanodeVisitorInfo(TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full, TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull, boolean disabled) {
            this.full = full;
            this.notFull = notFull;
            this.disabled = disabled;
        }
    }

    class EndpointShmManager {
        private final DatanodeInfo datanode;
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full = new TreeMap();
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull = new TreeMap();
        private boolean disabled = false;
        private boolean loading = false;

        EndpointShmManager(DatanodeInfo datanode) {
            this.datanode = datanode;
        }

        private ShortCircuitShm.Slot allocSlotFromExistingShm(ExtendedBlockId blockId) {
            if (this.notFull.isEmpty()) {
                return null;
            }
            Map.Entry<ShortCircuitShm.ShmId, DfsClientShm> entry = this.notFull.firstEntry();
            DfsClientShm shm = entry.getValue();
            ShortCircuitShm.ShmId shmId = shm.getShmId();
            ShortCircuitShm.Slot slot = shm.allocAndRegisterSlot(blockId);
            if (shm.isFull()) {
                LOG.trace("{}: pulled the last slot {} out of {}", new Object[]{this, slot.getSlotIdx(), shm});
                DfsClientShm removedShm = this.notFull.remove(shmId);
                Preconditions.checkState((removedShm == shm ? 1 : 0) != 0);
                this.full.put(shmId, shm);
            } else {
                LOG.trace("{}: pulled slot {} out of {}", new Object[]{this, slot.getSlotIdx(), shm});
            }
            return slot;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private DfsClientShm requestNewShm(String clientName, DomainPeer peer) throws IOException {
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
            new Sender(out).requestShortCircuitShm(clientName);
            DataTransferProtos.ShortCircuitShmResponseProto resp = DataTransferProtos.ShortCircuitShmResponseProto.parseFrom(PBHelperClient.vintPrefixed(peer.getInputStream()));
            String error = resp.hasError() ? resp.getError() : "(unknown)";
            switch (resp.getStatus()) {
                case SUCCESS: {
                    DomainSocket sock = peer.getDomainSocket();
                    byte[] buf = new byte[1];
                    FileInputStream[] fis = new FileInputStream[1];
                    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
                        throw new EOFException("got EOF while trying to transfer the file descriptor for the shared memory segment.");
                    }
                    if (fis[0] == null) {
                        throw new IOException("the datanode " + this.datanode + " failed to pass a file descriptor for the shared memory segment.");
                    }
                    try {
                        DfsClientShm shm = new DfsClientShm(PBHelperClient.convert(resp.getId()), fis[0], this, peer);
                        LOG.trace("{}: createNewShm: created {}", (Object)this, (Object)shm);
                        DfsClientShm dfsClientShm = shm;
                        return dfsClientShm;
                    }
                    finally {
                        try {
                            fis[0].close();
                        }
                        catch (Throwable e) {
                            LOG.debug("Exception in closing " + fis[0], e);
                        }
                    }
                }
                case ERROR_UNSUPPORTED: {
                    LOG.info(this + ": datanode does not support short-circuit shared memory access: " + error);
                    this.disabled = true;
                    return null;
                }
            }
            LOG.warn(this + ": error requesting short-circuit shared memory access: " + error);
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ShortCircuitShm.Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException {
            while (true) {
                DfsClientShm shm;
                if (DfsClientShmManager.this.closed) {
                    LOG.trace("{}: the DfsClientShmManager has been closed.", (Object)this);
                    return null;
                }
                if (this.disabled) {
                    LOG.trace("{}: shared memory segment access is disabled.", (Object)this);
                    return null;
                }
                ShortCircuitShm.Slot slot = this.allocSlotFromExistingShm(blockId);
                if (slot != null) {
                    return slot;
                }
                if (this.loading) {
                    LOG.trace("{}: waiting for loading to finish...", (Object)this);
                    DfsClientShmManager.this.finishedLoading.awaitUninterruptibly();
                    continue;
                }
                this.loading = true;
                DfsClientShmManager.this.lock.unlock();
                try {
                    shm = this.requestNewShm(clientName, peer);
                    if (shm == null) continue;
                    DfsClientShmManager.this.domainSocketWatcher.add(peer.getDomainSocket(), (DomainSocketWatcher.Handler)shm);
                    usedPeer.setValue(true);
                }
                finally {
                    DfsClientShmManager.this.lock.lock();
                    this.loading = false;
                    DfsClientShmManager.this.finishedLoading.signalAll();
                    continue;
                }
                if (shm.isDisconnected()) {
                    LOG.debug("{}: the UNIX domain socket associated with this short-circuit memory closed before we could make use of the shm.", (Object)this);
                    continue;
                }
                this.notFull.put(shm.getShmId(), shm);
            }
        }

        void freeSlot(ShortCircuitShm.Slot slot) {
            DfsClientShm shm = (DfsClientShm)slot.getShm();
            shm.unregisterSlot(slot.getSlotIdx());
            if (shm.isDisconnected()) {
                Preconditions.checkState((!this.full.containsKey(shm.getShmId()) ? 1 : 0) != 0);
                Preconditions.checkState((!this.notFull.containsKey(shm.getShmId()) ? 1 : 0) != 0);
                if (shm.isEmpty()) {
                    LOG.trace("{}: freeing empty stale {}", (Object)this, (Object)shm);
                    shm.free();
                }
            } else {
                ShortCircuitShm.ShmId shmId = shm.getShmId();
                this.full.remove(shmId);
                if (shm.isEmpty()) {
                    this.notFull.remove(shmId);
                    LOG.trace("{}: shutting down UNIX domain socket for empty {}", (Object)this, (Object)shm);
                    this.shutdown(shm);
                } else {
                    this.notFull.put(shmId, shm);
                }
            }
        }

        void unregisterShm(ShortCircuitShm.ShmId shmId) {
            DfsClientShmManager.this.lock.lock();
            try {
                this.full.remove(shmId);
                this.notFull.remove(shmId);
            }
            finally {
                DfsClientShmManager.this.lock.unlock();
            }
        }

        public String toString() {
            return String.format("EndpointShmManager(%s, parent=%s)", this.datanode, DfsClientShmManager.this);
        }

        PerDatanodeVisitorInfo getVisitorInfo() {
            return new PerDatanodeVisitorInfo(this.full, this.notFull, this.disabled);
        }

        final void shutdown(DfsClientShm shm) {
            try {
                shm.getPeer().getDomainSocket().shutdown();
            }
            catch (IOException e) {
                LOG.warn(this + ": error shutting down shm: got IOException calling shutdown(SHUT_RDWR)", (Throwable)e);
            }
        }
    }
}

