/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
import org.apache.hadoop.hdfs.server.datanode.BlockSender;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

class DataXceiver
extends Receiver
implements Runnable {
    public static final Log LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private final Socket s;
    private final boolean isLocal;
    private final String remoteAddress;
    private final String localAddress;
    private final DataNode datanode;
    private final DNConf dnConf;
    private final DataXceiverServer dataXceiverServer;
    private long opStartTime;

    public DataXceiver(Socket s, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException {
        super(new DataInputStream(new BufferedInputStream(NetUtils.getInputStream((Socket)s), HdfsConstants.SMALL_BUFFER_SIZE)));
        this.s = s;
        this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
        this.datanode = datanode;
        this.dnConf = datanode.getDnConf();
        this.dataXceiverServer = dataXceiverServer;
        this.remoteAddress = s.getRemoteSocketAddress().toString();
        this.localAddress = s.getLocalSocketAddress().toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Number of active connections is: " + datanode.getXceiverCount()));
        }
    }

    private void updateCurrentThreadName(String status) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ").append(this.remoteAddress);
        if (status != null) {
            sb.append(" [").append(status).append("]");
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int opsProcessed = 0;
        Enum op = null;
        this.dataXceiverServer.childSockets.add(this.s);
        try {
            int stdTimeout = this.s.getSoTimeout();
            do {
                this.updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
                try {
                    if (opsProcessed != 0) {
                        assert (this.dnConf.socketKeepaliveTimeout > 0);
                        this.s.setSoTimeout(this.dnConf.socketKeepaliveTimeout);
                    }
                    op = this.readOp();
                }
                catch (InterruptedIOException ignored) {
                    break;
                }
                catch (IOException err) {
                    if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("Cached " + this.s.toString() + " closing after " + opsProcessed + " ops"));
                        }
                        break;
                    }
                    throw err;
                }
                if (opsProcessed != 0) {
                    this.s.setSoTimeout(stdTimeout);
                }
                this.opStartTime = Util.now();
                this.processOp((Op)op);
                ++opsProcessed;
            } while (!this.s.isClosed() && this.dnConf.socketKeepaliveTimeout > 0);
        }
        catch (Throwable t) {
            LOG.error((Object)(this.datanode.getMachineName() + ":DataXceiver error processing " + (op == null ? "unknown" : op.name()) + " operation " + " src: " + this.remoteAddress + " dest: " + this.localAddress), t);
        }
        finally {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)(this.datanode.getMachineName() + ":Number of active connections is: " + this.datanode.getXceiverCount()));
            }
            this.updateCurrentThreadName("Cleaning up");
            IOUtils.closeStream((Closeable)this.in);
            IOUtils.closeSocket((Socket)this.s);
            this.dataXceiverServer.childSockets.remove(this.s);
        }
    }

    /*
     * Loose catch block
     */
    @Override
    public void readBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, String clientName, long blockOffset, long length) throws IOException {
        OutputStream baseStream = NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout);
        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
        this.checkAccess(out, true, block, blockToken, Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
        BlockSender blockSender = null;
        DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(block.getBlockPoolId());
        String clientTraceFmt = clientName.length() > 0 && ClientTraceLog.isInfoEnabled() ? String.format("src: %s, dest: %s, bytes: %s, op: %s, cliID: %s, offset: %s, srvID: %s, blockid: %s, duration: %s", this.localAddress, this.remoteAddress, "%d", "HDFS_READ", clientName, "%d", dnR.getStorageID(), block, "%d") : dnR + " Served block " + block + " to " + this.remoteAddress;
        this.updateCurrentThreadName("Sending block " + block);
        try {
            try {
                blockSender = new BlockSender(block, blockOffset, length, true, false, this.datanode, clientTraceFmt);
            }
            catch (IOException e) {
                String msg = "opReadBlock " + block + " received exception " + e;
                LOG.info((Object)msg);
                DataXceiver.sendResponse(this.s, DataTransferProtos.Status.ERROR, msg, this.dnConf.socketWriteTimeout);
                throw e;
            }
            this.writeSuccessWithChecksumInfo(blockSender, DataXceiver.getStreamWithTimeout(this.s, this.dnConf.socketWriteTimeout));
            long read = blockSender.sendBlock(out, baseStream, null);
            if (blockSender.didSendEntireByteRange()) {
                try {
                    DataTransferProtos.ClientReadStatusProto stat = DataTransferProtos.ClientReadStatusProto.parseFrom(HdfsProtoUtil.vintPrefixed(this.in));
                    if (!stat.hasStatus()) {
                        LOG.warn((Object)("Client " + this.s.getInetAddress() + " did not send a valid status " + "code after reading. Will close connection."));
                        IOUtils.closeStream((Closeable)out);
                    }
                }
                catch (IOException ioe) {
                    LOG.debug((Object)"Error reading client status response. Will close connection.", (Throwable)ioe);
                    IOUtils.closeStream((Closeable)out);
                }
            } else {
                IOUtils.closeStream((Closeable)out);
            }
            this.datanode.metrics.incrBytesRead((int)read);
            this.datanode.metrics.incrBlocksRead();
        }
        catch (SocketException ignored) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(dnR + ":Ignoring exception while serving " + block + " to " + this.remoteAddress), (Throwable)ignored);
            }
            this.datanode.metrics.incrBlocksRead();
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeStream((Closeable)blockSender);
        }
        catch (IOException ioe) {
            LOG.warn((Object)(dnR + ":Got exception while serving " + block + " to " + this.remoteAddress), (Throwable)ioe);
            throw ioe;
            {
                catch (Throwable throwable) {
                    IOUtils.closeStream(blockSender);
                    throw throwable;
                }
            }
        }
        IOUtils.closeStream((Closeable)blockSender);
        this.datanode.metrics.addReadBlockOp(this.elapsed());
        this.datanode.metrics.incrReadsFromClient(this.isLocal);
    }

    @Override
    public void writeBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, String clientname, DatanodeInfo[] targets, DatanodeInfo srcDataNode, BlockConstructionStage stage, int pipelineSize, long minBytesRcvd, long maxBytesRcvd, long latestGenerationStamp, DataChecksum requestedChecksum) throws IOException {
        boolean isTransfer;
        this.updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
        boolean isDatanode = clientname.length() == 0;
        boolean isClient = !isDatanode;
        boolean bl = isTransfer = stage == BlockConstructionStage.TRANSFER_RBW || stage == BlockConstructionStage.TRANSFER_FINALIZED;
        if (isTransfer && targets.length > 0) {
            throw new IOException((Object)((Object)stage) + " does not support multiple targets " + Arrays.asList(targets));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("opWriteBlock: stage=" + (Object)((Object)stage) + ", clientname=" + clientname + "\n  block  =" + block + ", newGs=" + latestGenerationStamp + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]" + "\n  targets=" + Arrays.asList(targets) + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode));
            LOG.debug((Object)("isDatanode=" + isDatanode + ", isClient=" + isClient + ", isTransfer=" + isTransfer));
            LOG.debug((Object)("writeBlock receive buf size " + this.s.getReceiveBufferSize() + " tcp no delay " + this.s.getTcpNoDelay()));
        }
        ExtendedBlock originalBlock = new ExtendedBlock(block);
        block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        LOG.info((Object)("Receiving block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress));
        DataOutputStream replyOut = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout), HdfsConstants.SMALL_BUFFER_SIZE));
        this.checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
        DataOutputStream mirrorOut = null;
        DataInputStream mirrorIn = null;
        Socket mirrorSock = null;
        BlockReceiver blockReceiver = null;
        String mirrorNode = null;
        String firstBadLink = "";
        DataTransferProtos.Status mirrorInStatus = DataTransferProtos.Status.SUCCESS;
        try {
            if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                blockReceiver = new BlockReceiver(block, this.in, this.s.getRemoteSocketAddress().toString(), this.s.getLocalSocketAddress().toString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, this.datanode, requestedChecksum);
            } else {
                this.datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
            }
            if (targets.length > 0) {
                InetSocketAddress mirrorTarget = null;
                mirrorNode = targets[0].getName();
                mirrorTarget = NetUtils.createSocketAddr((String)mirrorNode);
                mirrorSock = this.datanode.newSocket();
                try {
                    int timeoutValue = this.dnConf.socketTimeout + HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length;
                    int writeTimeout = this.dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length;
                    NetUtils.connect((Socket)mirrorSock, (SocketAddress)mirrorTarget, (int)timeoutValue);
                    mirrorSock.setSoTimeout(timeoutValue);
                    mirrorSock.setSendBufferSize(131072);
                    mirrorOut = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)mirrorSock, (long)writeTimeout), HdfsConstants.SMALL_BUFFER_SIZE));
                    mirrorIn = new DataInputStream(NetUtils.getInputStream((Socket)mirrorSock));
                    new Sender(mirrorOut).writeBlock(originalBlock, blockToken, clientname, targets, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
                    mirrorOut.flush();
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto connectAck = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
                        mirrorInStatus = connectAck.getStatus();
                        firstBadLink = connectAck.getFirstBadLink();
                        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                            LOG.info((Object)("Datanode " + targets.length + " got response for connect ack " + " from downstream datanode with firstbadlink as " + firstBadLink));
                        }
                    }
                }
                catch (IOException e) {
                    if (isClient) {
                        DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR).setFirstBadLink(mirrorNode).build().writeDelimitedTo(replyOut);
                        replyOut.flush();
                    }
                    IOUtils.closeStream(mirrorOut);
                    mirrorOut = null;
                    IOUtils.closeStream(mirrorIn);
                    mirrorIn = null;
                    IOUtils.closeSocket((Socket)mirrorSock);
                    mirrorSock = null;
                    if (isClient) {
                        LOG.error((Object)(this.datanode + ":Exception transfering block " + block + " to mirror " + mirrorNode + ": " + e));
                        throw e;
                    }
                    LOG.info((Object)(this.datanode + ":Exception transfering block " + block + " to mirror " + mirrorNode + ". continuing without the mirror."), (Throwable)e);
                }
            }
            if (isClient && !isTransfer) {
                if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtos.Status.SUCCESS) {
                    LOG.info((Object)("Datanode " + targets.length + " forwarding connect ack to upstream firstbadlink is " + firstBadLink));
                }
                DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(mirrorInStatus).setFirstBadLink(firstBadLink).build().writeDelimitedTo(replyOut);
                replyOut.flush();
            }
            if (blockReceiver != null) {
                String mirrorAddr = mirrorSock == null ? null : mirrorNode;
                blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets);
                if (isTransfer) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)"TRANSFER: send close-ack");
                    }
                    DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, replyOut);
                }
            }
            if (isClient && stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                block.setGenerationStamp(latestGenerationStamp);
                block.setNumBytes(minBytesRcvd);
            }
            if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
                this.datanode.closeBlock(block, "");
                LOG.info((Object)("Received block " + block + " src: " + this.remoteAddress + " dest: " + this.localAddress + " of size " + block.getNumBytes()));
            }
        }
        catch (IOException ioe) {
            try {
                LOG.info((Object)("opWriteBlock " + block + " received exception " + ioe));
                throw ioe;
            }
            catch (Throwable throwable) {
                IOUtils.closeStream(mirrorOut);
                IOUtils.closeStream(mirrorIn);
                IOUtils.closeStream((Closeable)replyOut);
                IOUtils.closeSocket(mirrorSock);
                IOUtils.closeStream(blockReceiver);
                throw throwable;
            }
        }
        IOUtils.closeStream(mirrorOut);
        IOUtils.closeStream(mirrorIn);
        IOUtils.closeStream((Closeable)replyOut);
        IOUtils.closeSocket((Socket)mirrorSock);
        IOUtils.closeStream((Closeable)blockReceiver);
        this.datanode.metrics.addWriteBlockOp(this.elapsed());
        this.datanode.metrics.incrWritesFromClient(this.isLocal);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void transferBlock(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken, String clientName, DatanodeInfo[] targets) throws IOException {
        this.checkAccess(null, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
        this.updateCurrentThreadName((Object)((Object)Op.TRANSFER_BLOCK) + " " + blk);
        DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout));
        try {
            this.datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
            DataXceiver.writeResponse(DataTransferProtos.Status.SUCCESS, null, out);
        }
        finally {
            IOUtils.closeStream((Closeable)out);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void blockChecksum(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken) throws IOException {
        DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout));
        this.checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
        this.updateCurrentThreadName("Reading metadata for block " + block);
        FSDatasetInterface.MetaDataInputStream metadataIn = this.datanode.data.getMetaDataInputStream(block);
        DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
        this.updateCurrentThreadName("Getting checksum for block " + block);
        try {
            BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
            DataChecksum checksum = header.getChecksum();
            int bytesPerCRC = checksum.getBytesPerChecksum();
            long crcPerBlock = (metadataIn.getLength() - (long)BlockMetadataHeader.getHeaderSize()) / (long)checksum.getChecksumSize();
            MD5Hash md5 = MD5Hash.digest((InputStream)checksumIn);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5));
            }
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(bytesPerCRC).setCrcPerBlock(crcPerBlock).setMd5(ByteString.copyFrom((byte[])md5.getDigest()))).build().writeDelimitedTo(out);
            out.flush();
        }
        finally {
            IOUtils.closeStream((Closeable)out);
            IOUtils.closeStream((Closeable)checksumIn);
            IOUtils.closeStream((Closeable)metadataIn);
        }
        this.datanode.metrics.addBlockChecksumOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void copyBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken) throws IOException {
        DataOutputStream reply;
        BlockSender blockSender;
        block13: {
            this.updateCurrentThreadName("Copying block " + block);
            if (this.datanode.isBlockTokenEnabled) {
                try {
                    this.datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, BlockTokenSecretManager.AccessMode.COPY);
                }
                catch (SecretManager.InvalidToken e) {
                    LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_COPY_BLOCK for block " + block + " : " + e.getLocalizedMessage()));
                    DataXceiver.sendResponse(this.s, DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token", this.dnConf.socketWriteTimeout);
                    return;
                }
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                String msg = "Not able to copy block " + block.getBlockId() + " to " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.";
                LOG.info((Object)msg);
                DataXceiver.sendResponse(this.s, DataTransferProtos.Status.ERROR, msg, this.dnConf.socketWriteTimeout);
                return;
            }
            blockSender = null;
            reply = null;
            boolean isOpSuccess = true;
            try {
                blockSender = new BlockSender(block, 0L, -1L, false, false, this.datanode, null);
                OutputStream baseStream = NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout);
                reply = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
                this.writeSuccessWithChecksumInfo(blockSender, reply);
                long read = blockSender.sendBlock(reply, baseStream, this.dataXceiverServer.balanceThrottler);
                this.datanode.metrics.incrBytesRead((int)read);
                this.datanode.metrics.incrBlocksRead();
                LOG.info((Object)("Copied block " + block + " to " + this.s.getRemoteSocketAddress()));
                this.dataXceiverServer.balanceThrottler.release();
                if (!isOpSuccess) break block13;
            }
            catch (IOException ioe) {
                try {
                    isOpSuccess = false;
                    LOG.info((Object)("opCopyBlock " + block + " received exception " + ioe));
                    throw ioe;
                }
                catch (Throwable throwable) {
                    this.dataXceiverServer.balanceThrottler.release();
                    if (isOpSuccess) {
                        try {
                            reply.writeChar(100);
                        }
                        catch (IOException ignored) {
                            // empty catch block
                        }
                    }
                    IOUtils.closeStream(reply);
                    IOUtils.closeStream((Closeable)blockSender);
                    throw throwable;
                }
            }
            try {
                reply.writeChar(100);
            }
            catch (IOException ignored) {
                // empty catch block
            }
        }
        IOUtils.closeStream((Closeable)reply);
        IOUtils.closeStream((Closeable)blockSender);
        this.datanode.metrics.addCopyBlockOp(this.elapsed());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void replaceBlock(ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, String delHint, DatanodeInfo proxySource) throws IOException {
        DataInputStream proxyReply;
        BlockReceiver blockReceiver;
        String errMsg;
        DataTransferProtos.Status opStatus;
        DataOutputStream proxyOut;
        block19: {
            this.updateCurrentThreadName("Replacing block " + block + " from " + delHint);
            block.setNumBytes(this.dataXceiverServer.estimateBlockSize);
            if (this.datanode.isBlockTokenEnabled) {
                try {
                    this.datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, BlockTokenSecretManager.AccessMode.REPLACE);
                }
                catch (SecretManager.InvalidToken e) {
                    LOG.warn((Object)("Invalid access token in request from " + this.remoteAddress + " for OP_REPLACE_BLOCK for block " + block + " : " + e.getLocalizedMessage()));
                    DataXceiver.sendResponse(this.s, DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token", this.dnConf.socketWriteTimeout);
                    return;
                }
            }
            if (!this.dataXceiverServer.balanceThrottler.acquire()) {
                String msg = "Not able to receive block " + block.getBlockId() + " from " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.";
                LOG.warn((Object)msg);
                DataXceiver.sendResponse(this.s, DataTransferProtos.Status.ERROR, msg, this.dnConf.socketWriteTimeout);
                return;
            }
            Socket proxySock = null;
            proxyOut = null;
            opStatus = DataTransferProtos.Status.SUCCESS;
            errMsg = null;
            blockReceiver = null;
            proxyReply = null;
            try {
                InetSocketAddress proxyAddr = NetUtils.createSocketAddr((String)proxySource.getName());
                proxySock = this.datanode.newSocket();
                NetUtils.connect((Socket)proxySock, (SocketAddress)proxyAddr, (int)this.dnConf.socketTimeout);
                proxySock.setSoTimeout(this.dnConf.socketTimeout);
                OutputStream baseStream = NetUtils.getOutputStream((Socket)proxySock, (long)this.dnConf.socketWriteTimeout);
                proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
                new Sender(proxyOut).copyBlock(block, blockToken);
                proxyReply = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream((Socket)proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
                DataTransferProtos.BlockOpResponseProto copyResponse = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(proxyReply));
                if (copyResponse.getStatus() != DataTransferProtos.Status.SUCCESS) {
                    if (copyResponse.getStatus() == DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                        throw new IOException("Copy block " + block + " from " + proxySock.getRemoteSocketAddress() + " failed due to access token error");
                    }
                    throw new IOException("Copy block " + block + " from " + proxySock.getRemoteSocketAddress() + " failed");
                }
                DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
                DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum());
                blockReceiver = new BlockReceiver(block, proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode, remoteChecksum);
                blockReceiver.receiveBlock(null, null, null, null, this.dataXceiverServer.balanceThrottler, null);
                this.datanode.notifyNamenodeReceivedBlock(block, delHint);
                LOG.info((Object)("Moved block " + block + " from " + this.s.getRemoteSocketAddress()));
                if (opStatus != DataTransferProtos.Status.SUCCESS) break block19;
            }
            catch (IOException ioe) {
                try {
                    opStatus = DataTransferProtos.Status.ERROR;
                    errMsg = "opReplaceBlock " + block + " received exception " + ioe;
                    LOG.info((Object)errMsg);
                    throw ioe;
                }
                catch (Throwable throwable) {
                    if (opStatus == DataTransferProtos.Status.SUCCESS) {
                        try {
                            proxyReply.readChar();
                        }
                        catch (IOException ignored) {
                            // empty catch block
                        }
                    }
                    this.dataXceiverServer.balanceThrottler.release();
                    try {
                        DataXceiver.sendResponse(this.s, opStatus, errMsg, this.dnConf.socketWriteTimeout);
                    }
                    catch (IOException ioe2) {
                        LOG.warn((Object)("Error writing reply back to " + this.s.getRemoteSocketAddress()));
                    }
                    IOUtils.closeStream(proxyOut);
                    IOUtils.closeStream(blockReceiver);
                    IOUtils.closeStream((Closeable)proxyReply);
                    throw throwable;
                }
            }
            try {
                proxyReply.readChar();
            }
            catch (IOException ignored) {
                // empty catch block
            }
        }
        this.dataXceiverServer.balanceThrottler.release();
        try {
            DataXceiver.sendResponse(this.s, opStatus, errMsg, this.dnConf.socketWriteTimeout);
        }
        catch (IOException ioe) {
            LOG.warn((Object)("Error writing reply back to " + this.s.getRemoteSocketAddress()));
        }
        IOUtils.closeStream((Closeable)proxyOut);
        IOUtils.closeStream((Closeable)blockReceiver);
        IOUtils.closeStream((Closeable)proxyReply);
        this.datanode.metrics.addReplaceBlockOp(this.elapsed());
    }

    private long elapsed() {
        return Util.now() - this.opStartTime;
    }

    private static void sendResponse(Socket s, DataTransferProtos.Status status, String message, long timeout) throws IOException {
        DataOutputStream reply = DataXceiver.getStreamWithTimeout(s, timeout);
        DataXceiver.writeResponse(status, message, reply);
    }

    private static DataOutputStream getStreamWithTimeout(Socket s, long timeout) throws IOException {
        return new DataOutputStream(NetUtils.getOutputStream((Socket)s, (long)timeout));
    }

    private static void writeResponse(DataTransferProtos.Status status, String message, OutputStream out) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (message != null) {
            response.setMessage(message);
        }
        response.build().writeDelimitedTo(out);
        out.flush();
    }

    private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream out) throws IOException {
        DataTransferProtos.ReadOpChecksumInfoProto ckInfo = DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())).setChunkOffset(blockSender.getOffset()).build();
        DataTransferProtos.BlockOpResponseProto response = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(ckInfo).build();
        response.writeDelimitedTo(out);
        out.flush();
    }

    private void checkAccess(DataOutputStream out, boolean reply, ExtendedBlock blk, Token<BlockTokenIdentifier> t, Op op, BlockTokenSecretManager.AccessMode mode) throws IOException {
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
            }
            catch (SecretManager.InvalidToken e) {
                try {
                    if (reply) {
                        if (out == null) {
                            out = new DataOutputStream(NetUtils.getOutputStream((Socket)this.s, (long)this.dnConf.socketWriteTimeout));
                        }
                        DataTransferProtos.BlockOpResponseProto.Builder resp = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                        if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
                            DatanodeRegistration dnR = this.datanode.getDNRegistrationForBP(blk.getBlockPoolId());
                            resp.setFirstBadLink(dnR.getName());
                        }
                        resp.build().writeDelimitedTo(out);
                        out.flush();
                    }
                    LOG.warn((Object)("Block token verification failed: op=" + (Object)((Object)op) + ", remoteAddress=" + this.remoteAddress + ", message=" + e.getLocalizedMessage()));
                    throw e;
                }
                catch (Throwable throwable) {
                    IOUtils.closeStream((Closeable)out);
                    throw throwable;
                }
            }
        }
    }
}

