/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.TraceScope;
import org.slf4j.Logger;

class BlockSender
implements Closeable {
    static final Logger LOG = DataNode.LOG;
    static final Log ClientTraceLog = DataNode.ClientTraceLog;
    private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
    private static final int MIN_BUFFER_WITH_TRANSFERTO = 65536;
    private static final int IO_FILE_BUFFER_SIZE;
    private static final int TRANSFERTO_BUFFER_SIZE;
    private final ExtendedBlock block;
    private ReplicaInputStreams ris;
    private long blockInPosition = -1L;
    private final DataChecksum checksum;
    private long initialOffset;
    private long offset;
    private final long endOffset;
    private final int chunkSize;
    private final int checksumSize;
    private final boolean corruptChecksumOk;
    private long seqno;
    private final boolean transferToAllowed;
    private boolean sentEntireByteRange;
    private final boolean verifyChecksum;
    private final String clientTraceFmt;
    private volatile ChunkChecksum lastChunkChecksum = null;
    private DataNode datanode;
    private final Replica replica;
    private final long readaheadLength;
    private ReadaheadPool.ReadaheadRequest curReadahead;
    private final boolean alwaysReadahead;
    private final boolean dropCacheBehindLargeReads;
    private final boolean dropCacheBehindAllReads;
    private long lastCacheDropOffset;
    private final FileIoProvider fileIoProvider;
    @VisibleForTesting
    static long CACHE_DROP_INTERVAL_BYTES;
    private static final long LONG_READ_THRESHOLD_BYTES = 262144L;

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean verifyChecksum, boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException {
        InputStream blockIn = null;
        DataInputStream checksumIn = null;
        FsVolumeReference volumeRef = null;
        this.fileIoProvider = datanode.getFileIoProvider();
        try {
            long checksumSkip;
            long end;
            int size;
            DataChecksum csum;
            long replicaVisibleLength;
            ChunkChecksum chunkChecksum;
            block46: {
                this.block = block;
                this.corruptChecksumOk = corruptChecksumOk;
                this.verifyChecksum = verifyChecksum;
                this.clientTraceFmt = clientTraceFmt;
                if (cachingStrategy.getDropBehind() == null) {
                    this.dropCacheBehindAllReads = false;
                    this.dropCacheBehindLargeReads = datanode.getDnConf().dropCacheBehindReads;
                } else {
                    this.dropCacheBehindAllReads = this.dropCacheBehindLargeReads = cachingStrategy.getDropBehind().booleanValue();
                }
                if (cachingStrategy.getReadahead() == null) {
                    this.alwaysReadahead = false;
                    this.readaheadLength = datanode.getDnConf().readaheadLength;
                } else {
                    this.alwaysReadahead = true;
                    this.readaheadLength = cachingStrategy.getReadahead();
                }
                this.datanode = datanode;
                if (verifyChecksum) {
                    Preconditions.checkArgument((boolean)sendChecksum, (Object)"If verifying checksum, currently must also send it.");
                }
                chunkChecksum = null;
                try (AutoCloseableLock lock = datanode.data.acquireDatasetLock();){
                    this.replica = BlockSender.getReplica(block, datanode);
                    replicaVisibleLength = this.replica.getVisibleLength();
                    if (this.replica instanceof FinalizedReplica) {
                        FinalizedReplica frep = (FinalizedReplica)this.replica;
                        chunkChecksum = frep.getLastChecksumAndDataLen();
                    }
                }
                if (this.replica.getState() == HdfsServerConstants.ReplicaState.RBW) {
                    ReplicaInPipeline rbw = (ReplicaInPipeline)this.replica;
                    BlockSender.waitForMinLength(rbw, startOffset + length);
                    chunkChecksum = rbw.getLastChecksumAndDataLen();
                }
                if (this.replica.getGenerationStamp() < block.getGenerationStamp()) {
                    throw new IOException("Replica gen stamp < block genstamp, block=" + block + ", replica=" + this.replica);
                }
                if (this.replica.getGenerationStamp() > block.getGenerationStamp()) {
                    if (DataNode.LOG.isDebugEnabled()) {
                        DataNode.LOG.debug("Bumping up the client provided block's genstamp to latest " + this.replica.getGenerationStamp() + " for block " + block);
                    }
                    block.setGenerationStamp(this.replica.getGenerationStamp());
                }
                if (replicaVisibleLength < 0L) {
                    throw new IOException("Replica is not readable, block=" + block + ", replica=" + this.replica);
                }
                if (DataNode.LOG.isDebugEnabled()) {
                    DataNode.LOG.debug("block=" + block + ", replica=" + this.replica);
                }
                this.transferToAllowed = datanode.getDnConf().transferToAllowed && (!is32Bit || length <= Integer.MAX_VALUE);
                volumeRef = datanode.data.getVolume(block).obtainReference();
                csum = null;
                if (verifyChecksum || sendChecksum) {
                    LengthInputStream metaIn = null;
                    boolean keepMetaInOpen = false;
                    try {
                        DataNodeFaultInjector.get().throwTooManyOpenFiles();
                        metaIn = datanode.data.getMetaDataInputStream(block);
                        if (!corruptChecksumOk || metaIn != null) {
                            if (metaIn == null) {
                                throw new FileNotFoundException("Meta-data not found for " + block);
                            }
                            if (!this.replica.isOnTransientStorage() && metaIn.getLength() >= (long)BlockMetadataHeader.getHeaderSize()) {
                                checksumIn = new DataInputStream(new BufferedInputStream(metaIn, IO_FILE_BUFFER_SIZE));
                                csum = BlockMetadataHeader.readDataChecksum((DataInputStream)checksumIn, (Object)block);
                                keepMetaInOpen = true;
                            }
                        } else {
                            LOG.warn("Could not find metadata file for " + block);
                        }
                        if (keepMetaInOpen) break block46;
                    }
                    catch (FileNotFoundException e) {
                        try {
                            if (e.getMessage() == null) throw e;
                            if (e.getMessage().contains("Too many open files")) throw e;
                            datanode.notifyNamenodeDeletedBlock(block, this.replica.getStorageUuid());
                            datanode.data.invalidate(block.getBlockPoolId(), new Block[]{block.getLocalBlock()});
                            throw e;
                        }
                        catch (Throwable throwable) {
                            if (keepMetaInOpen) throw throwable;
                            IOUtils.closeStream(metaIn);
                            throw throwable;
                        }
                    }
                    IOUtils.closeStream((Closeable)metaIn);
                }
            }
            if (csum == null) {
                csum = DataChecksum.newDataChecksum((DataChecksum.Type)DataChecksum.Type.NULL, (int)512);
            }
            if ((size = csum.getBytesPerChecksum()) > 0xA00000 && (long)size > replicaVisibleLength) {
                csum = DataChecksum.newDataChecksum((DataChecksum.Type)csum.getChecksumType(), (int)Math.max((int)replicaVisibleLength, 0xA00000));
                size = csum.getBytesPerChecksum();
            }
            this.chunkSize = size;
            this.checksum = csum;
            this.checksumSize = this.checksum.getChecksumSize();
            length = length < 0L ? replicaVisibleLength : length;
            long l = end = chunkChecksum != null ? chunkChecksum.getDataLength() : this.replica.getBytesOnDisk();
            if (startOffset < 0L || startOffset > end || length + startOffset > end) {
                String msg = " Offset " + startOffset + " and length " + length + " don't match block " + block + " ( blockLen " + end + " )";
                LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + ":sendBlock() : " + msg);
                throw new IOException(msg);
            }
            this.offset = startOffset - startOffset % (long)this.chunkSize;
            if (length >= 0L) {
                long tmpLen = startOffset + length;
                if (tmpLen % (long)this.chunkSize != 0L) {
                    tmpLen += (long)this.chunkSize - tmpLen % (long)this.chunkSize;
                }
                if (tmpLen < end) {
                    end = tmpLen;
                } else if (chunkChecksum != null) {
                    this.lastChunkChecksum = chunkChecksum;
                }
            }
            this.endOffset = end;
            if (this.offset > 0L && checksumIn != null && (checksumSkip = this.offset / (long)this.chunkSize * (long)this.checksumSize) > 0L) {
                IOUtils.skipFully((InputStream)checksumIn, (long)checksumSkip);
            }
            this.seqno = 0L;
            if (DataNode.LOG.isDebugEnabled()) {
                DataNode.LOG.debug("replica=" + this.replica);
            }
            blockIn = datanode.data.getBlockInputStream(block, this.offset);
            this.ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef, this.fileIoProvider);
            return;
        }
        catch (IOException ioe) {
            IOUtils.closeStream((Closeable)this);
            org.apache.commons.io.IOUtils.closeQuietly(blockIn);
            org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
            throw ioe;
        }
    }

    @Override
    public void close() throws IOException {
        if (this.ris.getDataInFd() != null && (this.dropCacheBehindAllReads || this.dropCacheBehindLargeReads && this.isLongRead())) {
            try {
                this.ris.dropCacheBehindReads(this.block.getBlockName(), this.lastCacheDropOffset, this.offset - this.lastCacheDropOffset, NativeIO.POSIX.POSIX_FADV_DONTNEED);
            }
            catch (Exception e) {
                LOG.warn("Unable to drop cache on file close", (Throwable)e);
            }
        }
        if (this.curReadahead != null) {
            this.curReadahead.cancel();
        }
        try {
            this.ris.closeStreams();
        }
        finally {
            IOUtils.closeStream((Closeable)this.ris);
            this.ris = null;
        }
    }

    private static Replica getReplica(ExtendedBlock block, DataNode datanode) throws ReplicaNotFoundException {
        Replica replica = datanode.data.getReplica(block.getBlockPoolId(), block.getBlockId());
        if (replica == null) {
            throw new ReplicaNotFoundException(block);
        }
        return replica;
    }

    private static void waitForMinLength(ReplicaInPipeline rbw, long len) throws IOException {
        for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; ++i) {
            try {
                Thread.sleep(100L);
                continue;
            }
            catch (InterruptedException ie) {
                throw new IOException(ie);
            }
        }
        long bytesOnDisk = rbw.getBytesOnDisk();
        if (bytesOnDisk < len) {
            throw new IOException(String.format("Need %d bytes, but only %d bytes available", len, bytesOnDisk));
        }
    }

    private static IOException ioeToSocketException(IOException ioe) {
        if (ioe.getClass().equals(IOException.class)) {
            SocketException se = new SocketException("Original Exception : " + ioe);
            se.initCause(ioe);
            se.setStackTrace(ioe.getStackTrace());
            return se;
        }
        return ioe;
    }

    private int numberOfChunks(long datalen) {
        return (int)((datalen + (long)this.chunkSize - 1L) / (long)this.chunkSize);
    }

    private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, boolean transferTo, DataTransferThrottler throttler) throws IOException {
        int dataLen = (int)Math.min(this.endOffset - this.offset, (long)this.chunkSize * (long)maxChunks);
        int numChunks = this.numberOfChunks(dataLen);
        int checksumDataLen = numChunks * this.checksumSize;
        int packetLen = dataLen + checksumDataLen + 4;
        boolean lastDataPacket = this.offset + (long)dataLen == this.endOffset && dataLen > 0;
        int headerLen = this.writePacketHeader(pkt, dataLen, packetLen);
        int headerOff = pkt.position() - headerLen;
        int checksumOff = pkt.position();
        byte[] buf = pkt.array();
        if (this.checksumSize > 0 && this.ris.getChecksumIn() != null) {
            this.readChecksum(buf, checksumOff, checksumDataLen);
            if (lastDataPacket && this.lastChunkChecksum != null) {
                int start = checksumOff + checksumDataLen - this.checksumSize;
                byte[] updatedChecksum = this.lastChunkChecksum.getChecksum();
                if (updatedChecksum != null) {
                    System.arraycopy(updatedChecksum, 0, buf, start, this.checksumSize);
                }
            }
        }
        int dataOff = checksumOff + checksumDataLen;
        if (!transferTo) {
            this.ris.readDataFully(buf, dataOff, dataLen);
            if (this.verifyChecksum) {
                this.verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
            }
        }
        try {
            if (transferTo) {
                SocketOutputStream sockOut = (SocketOutputStream)out;
                sockOut.write(buf, headerOff, dataOff - headerOff);
                FileChannel fileCh = ((FileInputStream)this.ris.getDataIn()).getChannel();
                LongWritable waitTime = new LongWritable();
                LongWritable transferTime = new LongWritable();
                this.fileIoProvider.transferToSocketFully(this.ris.getVolumeRef().getVolume(), sockOut, fileCh, this.blockInPosition, dataLen, waitTime, transferTime);
                this.datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
                this.datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
                this.blockInPosition += (long)dataLen;
            } else {
                out.write(buf, headerOff, dataOff + dataLen - headerOff);
            }
        }
        catch (IOException e) {
            String ioem;
            if (!(e instanceof SocketTimeoutException || (ioem = e.getMessage()).startsWith("Broken pipe") || ioem.startsWith("Connection reset"))) {
                LOG.error("BlockSender.sendChunks() exception: ", (Throwable)e);
                this.datanode.getBlockScanner().markSuspectBlock(this.ris.getVolumeRef().getVolume().getStorageID(), this.block);
            }
            throw BlockSender.ioeToSocketException(e);
        }
        if (throttler != null) {
            throttler.throttle(packetLen);
        }
        return dataLen;
    }

    private void readChecksum(byte[] buf, int checksumOffset, int checksumLen) throws IOException {
        if (this.checksumSize <= 0 && this.ris.getChecksumIn() == null) {
            return;
        }
        try {
            this.ris.readChecksumFully(buf, checksumOffset, checksumLen);
        }
        catch (IOException e) {
            LOG.warn(" Could not read or failed to verify checksum for data at offset " + this.offset + " for block " + this.block, (Throwable)e);
            this.ris.closeChecksumStream();
            if (this.corruptChecksumOk) {
                if (checksumOffset < checksumLen) {
                    Arrays.fill(buf, checksumOffset, checksumLen, (byte)0);
                }
            }
            throw e;
        }
    }

    public void verifyChecksum(byte[] buf, int dataOffset, int datalen, int numChunks, int checksumOffset) throws ChecksumException {
        int dOff = dataOffset;
        int cOff = checksumOffset;
        int dLeft = datalen;
        for (int i = 0; i < numChunks; ++i) {
            this.checksum.reset();
            int dLen = Math.min(dLeft, this.chunkSize);
            this.checksum.update(buf, dOff, dLen);
            if (!this.checksum.compare(buf, cOff)) {
                long failedPos = this.offset + (long)datalen - (long)dLeft;
                StringBuilder replicaInfoString = new StringBuilder();
                if (this.replica != null) {
                    replicaInfoString.append(" for replica: " + this.replica.toString());
                }
                throw new ChecksumException("Checksum failed at " + failedPos + replicaInfoString, failedPos);
            }
            dLeft -= dLen;
            dOff += dLen;
            cOff += this.checksumSize;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException {
        try (TraceScope scope = this.datanode.getTracer().newScope("sendBlock_" + this.block.getBlockId());){
            long l = this.doSendBlock(out, baseStream, throttler);
            return l;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long doSendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException {
        long startTime;
        long totalRead;
        block11: {
            if (out == null) {
                throw new IOException("out stream is null");
            }
            this.initialOffset = this.offset;
            totalRead = 0L;
            OutputStream streamForSendChunks = out;
            this.lastCacheDropOffset = this.initialOffset;
            if (this.isLongRead() && this.ris.getDataInFd() != null) {
                this.ris.dropCacheBehindReads(this.block.getBlockName(), 0L, 0L, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
            }
            this.manageOsCache();
            startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0L;
            try {
                int maxChunksPerPacket;
                boolean transferTo;
                int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
                boolean bl = transferTo = this.transferToAllowed && !this.verifyChecksum && baseStream instanceof SocketOutputStream && this.ris.getDataIn() instanceof FileInputStream;
                if (transferTo) {
                    FileChannel fileChannel = ((FileInputStream)this.ris.getDataIn()).getChannel();
                    this.blockInPosition = fileChannel.position();
                    streamForSendChunks = baseStream;
                    maxChunksPerPacket = this.numberOfChunks(TRANSFERTO_BUFFER_SIZE);
                    pktBufSize += this.checksumSize * maxChunksPerPacket;
                } else {
                    maxChunksPerPacket = Math.max(1, this.numberOfChunks(IO_FILE_BUFFER_SIZE));
                    pktBufSize += (this.chunkSize + this.checksumSize) * maxChunksPerPacket;
                }
                ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
                while (this.endOffset > this.offset && !Thread.currentThread().isInterrupted()) {
                    this.manageOsCache();
                    long len = this.sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler);
                    this.offset += len;
                    totalRead += len + (long)(this.numberOfChunks(len) * this.checksumSize);
                    ++this.seqno;
                }
                if (Thread.currentThread().isInterrupted()) break block11;
                try {
                    this.sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler);
                    out.flush();
                }
                catch (IOException e) {
                    throw BlockSender.ioeToSocketException(e);
                }
                this.sentEntireByteRange = true;
            }
            catch (Throwable throwable) {
                if (this.clientTraceFmt != null && ClientTraceLog.isDebugEnabled()) {
                    long endTime = System.nanoTime();
                    ClientTraceLog.debug((Object)String.format(this.clientTraceFmt, totalRead, this.initialOffset, endTime - startTime));
                }
                this.close();
                throw throwable;
            }
        }
        if (this.clientTraceFmt != null && ClientTraceLog.isDebugEnabled()) {
            long endTime = System.nanoTime();
            ClientTraceLog.debug((Object)String.format(this.clientTraceFmt, totalRead, this.initialOffset, endTime - startTime));
        }
        this.close();
        return totalRead;
    }

    private void manageOsCache() throws IOException {
        long nextCacheDropOffset;
        if (this.ris.getDataInFd() == null) {
            return;
        }
        if (this.readaheadLength > 0L && this.datanode.readaheadPool != null && (this.alwaysReadahead || this.isLongRead())) {
            this.curReadahead = this.datanode.readaheadPool.readaheadStream(this.clientTraceFmt, this.ris.getDataInFd(), this.offset, this.readaheadLength, Long.MAX_VALUE, this.curReadahead);
        }
        if ((this.dropCacheBehindAllReads || this.dropCacheBehindLargeReads && this.isLongRead()) && this.offset >= (nextCacheDropOffset = this.lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES)) {
            long dropLength = this.offset - this.lastCacheDropOffset;
            this.ris.dropCacheBehindReads(this.block.getBlockName(), this.lastCacheDropOffset, dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);
            this.lastCacheDropOffset = this.offset;
        }
    }

    private boolean isLongRead() {
        return this.endOffset - this.initialOffset > 262144L;
    }

    private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
        pkt.clear();
        PacketHeader header = new PacketHeader(packetLen, this.offset, this.seqno, dataLen == 0, dataLen, false);
        int size = header.getSerializedSize();
        pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
        header.putInBuffer(pkt);
        return size;
    }

    boolean didSendEntireByteRange() {
        return this.sentEntireByteRange;
    }

    DataChecksum getChecksum() {
        return this.checksum;
    }

    long getOffset() {
        return this.offset;
    }

    static {
        HdfsConfiguration conf = new HdfsConfiguration();
        IO_FILE_BUFFER_SIZE = DFSUtilClient.getIoFileBufferSize((Configuration)conf);
        TRANSFERTO_BUFFER_SIZE = Math.max(IO_FILE_BUFFER_SIZE, 65536);
        CACHE_DROP_INTERVAL_BYTES = 0x100000L;
    }
}

