/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Preconditions;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderUtil;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputStream;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

@InterfaceAudience.Private
public class RemoteBlockReader2
implements BlockReader {
    static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
    Socket dnSock;
    private ReadableByteChannel in;
    private DataChecksum checksum;
    private PacketHeader curHeader;
    private ByteBuffer curPacketBuf = null;
    private ByteBuffer curDataSlice = null;
    private long lastSeqNo = -1L;
    private long startOffset;
    private final String filename;
    private static DirectBufferPool bufferPool = new DirectBufferPool();
    private ByteBuffer headerBuf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN);
    private int bytesPerChecksum;
    private int checksumSize;
    private long bytesNeededToFinish;
    private final boolean verifyChecksum;
    private boolean sentStatusCode = false;
    byte[] skipBuf = null;
    ByteBuffer checksumBytes = null;
    int dataLeft = 0;

    @Override
    public synchronized int read(byte[] buf, int off, int len) throws IOException {
        if (this.curPacketBuf == null || this.curDataSlice.remaining() == 0 && this.bytesNeededToFinish > 0L) {
            this.readNextPacket();
        }
        if (this.curDataSlice.remaining() == 0) {
            return -1;
        }
        int nRead = Math.min(this.curDataSlice.remaining(), len);
        this.curDataSlice.get(buf, off, nRead);
        return nRead;
    }

    private void readNextPacket() throws IOException {
        Preconditions.checkState((this.curHeader == null || !this.curHeader.isLastPacketInBlock() ? 1 : 0) != 0);
        this.readPacketHeader();
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("DFSClient readNextPacket got header " + this.curHeader));
        }
        if (!this.curHeader.sanityCheck(this.lastSeqNo)) {
            throw new IOException("BlockReader: error in packet header " + this.curHeader);
        }
        if (this.curHeader.getDataLen() > 0) {
            int chunks = 1 + (this.curHeader.getDataLen() - 1) / this.bytesPerChecksum;
            int checksumsLen = chunks * this.checksumSize;
            int bufsize = checksumsLen + this.curHeader.getDataLen();
            this.resetPacketBuffer(checksumsLen, this.curHeader.getDataLen());
            this.lastSeqNo = this.curHeader.getSeqno();
            if (bufsize > 0) {
                RemoteBlockReader2.readChannelFully(this.in, this.curPacketBuf);
                this.curPacketBuf.flip();
                if (this.verifyChecksum) {
                    this.verifyPacketChecksums();
                }
            }
            this.bytesNeededToFinish -= (long)this.curHeader.getDataLen();
        }
        if (this.curHeader.getOffsetInBlock() < this.startOffset) {
            int newPos = (int)(this.startOffset - this.curHeader.getOffsetInBlock());
            this.curDataSlice.position(newPos);
        }
        if (this.bytesNeededToFinish <= 0L) {
            this.readTrailingEmptyPacket();
            if (this.verifyChecksum) {
                this.sendReadResult(this.dnSock, DataTransferProtos.Status.CHECKSUM_OK);
            } else {
                this.sendReadResult(this.dnSock, DataTransferProtos.Status.SUCCESS);
            }
        }
    }

    private void verifyPacketChecksums() throws ChecksumException {
        this.checksum.verifyChunkedSums(this.curDataSlice, this.curPacketBuf, this.filename, this.curHeader.getOffsetInBlock());
    }

    private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) throws IOException {
        while (buf.remaining() > 0) {
            int n = ch.read(buf);
            if (n >= 0) continue;
            throw new IOException("Premature EOF reading from " + ch);
        }
    }

    private void resetPacketBuffer(int checksumsLen, int dataLen) {
        int packetLen = checksumsLen + dataLen;
        if (this.curPacketBuf == null || this.curPacketBuf.capacity() < packetLen) {
            this.returnPacketBufToPool();
            this.curPacketBuf = bufferPool.getBuffer(packetLen);
        }
        this.curPacketBuf.position(checksumsLen);
        this.curDataSlice = this.curPacketBuf.slice();
        this.curDataSlice.limit(dataLen);
        this.curPacketBuf.clear();
        this.curPacketBuf.limit(checksumsLen + dataLen);
    }

    @Override
    public synchronized long skip(long n) throws IOException {
        long nSkipped;
        int ret;
        if (this.skipBuf == null) {
            this.skipBuf = new byte[this.bytesPerChecksum];
        }
        for (nSkipped = 0L; nSkipped < n; nSkipped += (long)ret) {
            int toSkip = (int)Math.min(n - nSkipped, (long)this.skipBuf.length);
            ret = this.read(this.skipBuf, 0, toSkip);
            if (ret > 0) continue;
            return nSkipped;
        }
        return nSkipped;
    }

    private void readPacketHeader() throws IOException {
        this.headerBuf.clear();
        RemoteBlockReader2.readChannelFully(this.in, this.headerBuf);
        this.headerBuf.flip();
        if (this.curHeader == null) {
            this.curHeader = new PacketHeader();
        }
        this.curHeader.readFields(this.headerBuf);
    }

    private void readTrailingEmptyPacket() throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)"Reading empty packet at end of read");
        }
        this.headerBuf.clear();
        RemoteBlockReader2.readChannelFully(this.in, this.headerBuf);
        this.headerBuf.flip();
        PacketHeader trailer = new PacketHeader();
        trailer.readFields(this.headerBuf);
        if (!trailer.isLastPacketInBlock() || trailer.getDataLen() != 0) {
            throw new IOException("Expected empty end-of-read packet! Header: " + trailer);
        }
    }

    protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
        this.dnSock = dnSock;
        this.in = in;
        this.checksum = checksum;
        this.verifyChecksum = verifyChecksum;
        this.startOffset = Math.max(startOffset, 0L);
        this.filename = file;
        this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
        this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
        this.checksumSize = this.checksum.getChecksumSize();
    }

    @Override
    public synchronized void close() throws IOException {
        this.returnPacketBufToPool();
        this.startOffset = -1L;
        this.checksum = null;
        if (this.dnSock != null) {
            this.dnSock.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void finalize() throws Throwable {
        try {
            this.returnPacketBufToPool();
        }
        finally {
            super.finalize();
        }
    }

    private void returnPacketBufToPool() {
        if (this.curPacketBuf != null) {
            bufferPool.returnBuffer(this.curPacketBuf);
            this.curPacketBuf = null;
        }
    }

    @Override
    public Socket takeSocket() {
        assert (this.hasSentStatusCode()) : "BlockReader shouldn't give back sockets mid-read";
        Socket res = this.dnSock;
        this.dnSock = null;
        return res;
    }

    @Override
    public boolean hasSentStatusCode() {
        return this.sentStatusCode;
    }

    void sendReadResult(Socket sock, DataTransferProtos.Status statusCode) {
        assert (!this.sentStatusCode) : "already sent status code to " + sock;
        try {
            RemoteBlockReader2.writeReadResult(sock, statusCode);
            this.sentStatusCode = true;
        }
        catch (IOException e) {
            LOG.info((Object)("Could not send read status (" + (Object)((Object)statusCode) + ") to datanode " + sock.getInetAddress() + ": " + e.getMessage()));
        }
    }

    static void writeReadResult(Socket sock, DataTransferProtos.Status statusCode) throws IOException {
        OutputStream out = NetUtils.getOutputStream((Socket)sock, (long)HdfsServerConstants.WRITE_TIMEOUT);
        DataTransferProtos.ClientReadStatusProto.newBuilder().setStatus(statusCode).build().writeDelimitedTo(out);
        out.flush();
    }

    public static String getFileName(InetSocketAddress s, String poolId, long blockId) {
        return s.toString() + ":" + poolId + ":" + blockId;
    }

    @Override
    public int readAll(byte[] buf, int offset, int len) throws IOException {
        return BlockReaderUtil.readAll(this, buf, offset, len);
    }

    @Override
    public void readFully(byte[] buf, int off, int len) throws IOException {
        BlockReaderUtil.readFully(this, buf, off, len);
    }

    public static BlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException {
        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream((Socket)sock, (long)HdfsServerConstants.WRITE_TIMEOUT)));
        new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
        Preconditions.checkArgument((sock.getChannel() != null ? 1 : 0) != 0, (String)"Socket %s does not have an associated Channel.", (Object[])new Object[]{sock});
        SocketInputStream sin = (SocketInputStream)NetUtils.getInputStream((Socket)sock);
        DataInputStream in = new DataInputStream((InputStream)sin);
        DataTransferProtos.BlockOpResponseProto status = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
        RemoteBlockReader2.checkSuccess(status, sock, block, file);
        DataTransferProtos.ReadOpChecksumInfoProto checksumInfo = status.getReadOpChecksumInfo();
        DataChecksum checksum = DataTransferProtoUtil.fromProto(checksumInfo.getChecksum());
        long firstChunkOffset = checksumInfo.getChunkOffset();
        if (firstChunkOffset < 0L || firstChunkOffset > startOffset || firstChunkOffset >= startOffset + (long)checksum.getBytesPerChecksum()) {
            throw new IOException("BlockReader: error in first chunk offset (" + firstChunkOffset + ") startOffset is " + startOffset + " for file " + file);
        }
        return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), (ReadableByteChannel)sin, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
    }

    static void checkSuccess(DataTransferProtos.BlockOpResponseProto status, Socket sock, ExtendedBlock block, String file) throws IOException {
        if (status.getStatus() != DataTransferProtos.Status.SUCCESS) {
            if (status.getStatus() == DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                throw new InvalidBlockTokenException("Got access token error for OP_READ_BLOCK, self=" + sock.getLocalSocketAddress() + ", remote=" + sock.getRemoteSocketAddress() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp());
            }
            throw new IOException("Got error for OP_READ_BLOCK, self=" + sock.getLocalSocketAddress() + ", remote=" + sock.getRemoteSocketAddress() + ", for file " + file + ", for pool " + block.getBlockPoolId() + " block " + block.getBlockId() + "_" + block.getGenerationStamp());
        }
    }
}

