/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.datanode.UpgradeManagerDatanode;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Private
class BPOfferService
implements Runnable {
    static final Log LOG = DataNode.LOG;
    final InetSocketAddress nnAddr;
    NamespaceInfo bpNSInfo;
    DatanodeRegistration bpRegistration;
    long lastBlockReport = 0L;
    long lastDeletedReport = 0L;
    boolean resetBlockReportTime = true;
    Thread bpThread;
    DatanodeProtocol bpNamenode;
    private long lastHeartbeat = 0L;
    private volatile boolean initialized = false;
    private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList = new LinkedList();
    private volatile int pendingReceivedRequests = 0;
    private volatile boolean shouldServiceRun = true;
    UpgradeManagerDatanode upgradeManager = null;
    private final DataNode dn;
    private final DNConf dnConf;

    BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
        this.dn = dn;
        this.nnAddr = nnAddr;
        this.dnConf = dn.getDnConf();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        LinkedList<ReceivedDeletedBlockInfo> linkedList = this.receivedAndDeletedBlockList;
        synchronized (linkedList) {
            this.lastHeartbeat = 0L;
            this.receivedAndDeletedBlockList.notifyAll();
            while (this.lastHeartbeat == 0L) {
                try {
                    this.receivedAndDeletedBlockList.wait(100L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public boolean isAlive() {
        return this.shouldServiceRun && this.bpThread.isAlive();
    }

    public String getBlockPoolId() {
        if (this.bpNSInfo != null) {
            return this.bpNSInfo.getBlockPoolID();
        }
        LOG.warn((Object)"Block pool ID needed, but service not yet registered with NN", (Throwable)new Exception("trace"));
        return null;
    }

    public NamespaceInfo getNamespaceInfo() {
        return this.bpNSInfo;
    }

    public String toString() {
        if (this.bpNSInfo == null) {
            String storageId = this.dn.getStorageId();
            if (storageId == null || "".equals(storageId)) {
                storageId = "unknown";
            }
            return "Block pool <registering> (storage id " + storageId + ") connecting to " + this.nnAddr;
        }
        return "Block pool " + this.getBlockPoolId() + " (storage id " + this.dn.getStorageId() + ") registered with " + this.nnAddr;
    }

    InetSocketAddress getNNSocketAddress() {
        return this.nnAddr;
    }

    @VisibleForTesting
    void setNameNode(DatanodeProtocol dnProtocol) {
        this.bpNamenode = dnProtocol;
    }

    NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
        NamespaceInfo nsInfo = null;
        while (this.shouldRun()) {
            try {
                nsInfo = this.bpNamenode.versionRequest();
                LOG.debug((Object)(this + " received versionRequest response: " + nsInfo));
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.warn((Object)("Problem connecting to server: " + this.nnAddr));
            }
            catch (IOException e) {
                LOG.warn((Object)("Problem connecting to server: " + this.nnAddr));
            }
            this.sleepAndLogInterrupts(5000, "requesting version info from NN");
        }
        if (nsInfo != null) {
            this.checkNNVersion(nsInfo);
        }
        return nsInfo;
    }

    private void checkNNVersion(NamespaceInfo nsInfo) throws IncorrectVersionException {
        String stBuildVer;
        String nsBuildVer = nsInfo.getBuildVersion();
        if (!nsBuildVer.equals(stBuildVer = Storage.getBuildVersion())) {
            LOG.warn((Object)("Data-node and name-node Build versions must be the same. Namenode build version: " + nsBuildVer + "Datanode " + "build version: " + stBuildVer));
            throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
        }
        if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
            LOG.warn((Object)("Data-node and name-node layout versions must be the same. Expected: " + HdfsConstants.LAYOUT_VERSION + " actual " + this.bpNSInfo.getLayoutVersion()));
            throw new IncorrectVersionException(this.bpNSInfo.getLayoutVersion(), "namenode");
        }
    }

    private void connectToNNAndHandshake() throws IOException {
        this.bpNamenode = this.dn.connectToNN(this.nnAddr);
        this.bpNSInfo = this.retrieveNamespaceInfo();
        this.dn.initBlockPool(this);
        this.register();
    }

    void scheduleBlockReport(long delay) {
        this.lastBlockReport = delay > 0L ? System.currentTimeMillis() - (this.dnConf.blockReportInterval - (long)DFSUtil.getRandom().nextInt((int)delay)) : this.lastHeartbeat - this.dnConf.blockReportInterval;
        this.resetBlockReportTime = true;
    }

    void reportBadBlocks(ExtendedBlock block) {
        DatanodeInfo[] dnArr = new DatanodeInfo[]{new DatanodeInfo(this.bpRegistration)};
        LocatedBlock[] blocks = new LocatedBlock[]{new LocatedBlock(block, dnArr)};
        try {
            this.bpNamenode.reportBadBlocks(blocks);
        }
        catch (IOException e) {
            LOG.warn((Object)("Failed to report bad block " + block + " to namenode : " + " Exception"), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reportReceivedDeletedBlocks() throws IOException {
        int currentReceivedRequestsCounter;
        ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
        LinkedList<ReceivedDeletedBlockInfo> linkedList = this.receivedAndDeletedBlockList;
        synchronized (linkedList) {
            currentReceivedRequestsCounter = this.pendingReceivedRequests;
            int numBlocks = this.receivedAndDeletedBlockList.size();
            if (numBlocks > 0) {
                receivedAndDeletedBlockArray = this.receivedAndDeletedBlockList.toArray(new ReceivedDeletedBlockInfo[numBlocks]);
            }
        }
        if (receivedAndDeletedBlockArray != null) {
            this.bpNamenode.blockReceivedAndDeleted(this.bpRegistration, this.getBlockPoolId(), receivedAndDeletedBlockArray);
            linkedList = this.receivedAndDeletedBlockList;
            synchronized (linkedList) {
                for (int i = 0; i < receivedAndDeletedBlockArray.length; ++i) {
                    this.receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
                }
                this.pendingReceivedRequests -= currentReceivedRequestsCounter;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
        if (block == null || delHint == null) {
            throw new IllegalArgumentException(block == null ? "Block is null" : "delHint is null");
        }
        if (!block.getBlockPoolId().equals(this.getBlockPoolId())) {
            LOG.warn((Object)("BlockPool mismatch " + block.getBlockPoolId() + " vs. " + this.getBlockPoolId()));
            return;
        }
        LinkedList<ReceivedDeletedBlockInfo> linkedList = this.receivedAndDeletedBlockList;
        synchronized (linkedList) {
            this.receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block.getLocalBlock(), delHint));
            ++this.pendingReceivedRequests;
            this.receivedAndDeletedBlockList.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void notifyNamenodeDeletedBlock(ExtendedBlock block) {
        if (block == null) {
            throw new IllegalArgumentException("Block is null");
        }
        if (!block.getBlockPoolId().equals(this.getBlockPoolId())) {
            LOG.warn((Object)("BlockPool mismatch " + block.getBlockPoolId() + " vs. " + this.getBlockPoolId()));
            return;
        }
        LinkedList<ReceivedDeletedBlockInfo> linkedList = this.receivedAndDeletedBlockList;
        synchronized (linkedList) {
            this.receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block.getLocalBlock(), "-"));
        }
    }

    DatanodeCommand blockReport() throws IOException {
        DatanodeCommand cmd = null;
        long startTime = Util.now();
        if (startTime - this.lastBlockReport > this.dnConf.blockReportInterval) {
            long brCreateStartTime = Util.now();
            BlockListAsLongs bReport = this.dn.data.getBlockReport(this.getBlockPoolId());
            long brSendStartTime = Util.now();
            cmd = this.bpNamenode.blockReport(this.bpRegistration, this.getBlockPoolId(), bReport.getBlockListAsLongs());
            long brSendCost = Util.now() - brSendStartTime;
            long brCreateCost = brSendStartTime - brCreateStartTime;
            this.dn.metrics.addBlockReport(brSendCost);
            LOG.info((Object)("BlockReport of " + bReport.getNumberOfBlocks() + " blocks took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing"));
            if (this.resetBlockReportTime) {
                this.lastBlockReport = startTime - (long)DFSUtil.getRandom().nextInt((int)this.dnConf.blockReportInterval);
                this.resetBlockReportTime = false;
            } else {
                this.lastBlockReport += (Util.now() - this.lastBlockReport) / this.dnConf.blockReportInterval * this.dnConf.blockReportInterval;
            }
            LOG.info((Object)("sent block report, processed command:" + cmd));
        }
        return cmd;
    }

    DatanodeCommand[] sendHeartBeat() throws IOException {
        return this.bpNamenode.sendHeartbeat(this.bpRegistration, this.dn.data.getCapacity(), this.dn.data.getDfsUsed(), this.dn.data.getRemaining(), this.dn.data.getBlockPoolUsed(this.getBlockPoolId()), this.dn.xmitsInProgress.get(), this.dn.getXceiverCount(), this.dn.data.getNumFailedVolumes());
    }

    void start() {
        if (this.bpThread != null && this.bpThread.isAlive()) {
            return;
        }
        this.bpThread = new Thread((Runnable)this, this.formatThreadName());
        this.bpThread.setDaemon(true);
        this.bpThread.start();
    }

    private String formatThreadName() {
        Collection<URI> dataDirs = DataNode.getStorageDirs(this.dn.getConf());
        return "DataNode: [" + StringUtils.uriToString((URI[])dataDirs.toArray(new URI[0])) + "] " + " heartbeating to " + this.nnAddr;
    }

    void stop() {
        this.shouldServiceRun = false;
        if (this.bpThread != null) {
            this.bpThread.interrupt();
        }
    }

    void join() {
        try {
            if (this.bpThread != null) {
                this.bpThread.join();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private synchronized void cleanUp() {
        if (this.upgradeManager != null) {
            this.upgradeManager.shutdownUpgrade();
        }
        this.shouldServiceRun = false;
        RPC.stopProxy((Object)this.bpNamenode);
        this.dn.shutdownBlockPool(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void offerService() throws Exception {
        LOG.info((Object)("For namenode " + this.nnAddr + " using DELETEREPORT_INTERVAL of " + this.dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of " + this.dnConf.blockReportInterval + "msec" + " Initial delay: " + this.dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval=" + this.dnConf.heartBeatInterval));
        while (this.shouldRun()) {
            try {
                long startTime = Util.now();
                if (startTime - this.lastHeartbeat > this.dnConf.heartBeatInterval) {
                    this.lastHeartbeat = startTime;
                    if (!this.dn.areHeartbeatsDisabledForTests()) {
                        DatanodeCommand[] cmds = this.sendHeartBeat();
                        this.dn.metrics.addHeartbeat(Util.now() - startTime);
                        long startProcessCommands = Util.now();
                        if (!this.processCommand(cmds)) continue;
                        long endProcessCommands = Util.now();
                        if (endProcessCommands - startProcessCommands > 2000L) {
                            LOG.info((Object)("Took " + (endProcessCommands - startProcessCommands) + "ms to process " + cmds.length + " commands from NN"));
                        }
                    }
                }
                if (this.pendingReceivedRequests > 0 || startTime - this.lastDeletedReport > this.dnConf.deleteReportInterval) {
                    this.reportReceivedDeletedBlocks();
                    this.lastDeletedReport = startTime;
                }
                DatanodeCommand cmd = this.blockReport();
                this.processCommand(cmd);
                if (this.dn.blockScanner != null) {
                    this.dn.blockScanner.addBlockPool(this.getBlockPoolId());
                }
                long waitTime = this.dnConf.heartBeatInterval - (System.currentTimeMillis() - this.lastHeartbeat);
                LinkedList<ReceivedDeletedBlockInfo> linkedList = this.receivedAndDeletedBlockList;
                synchronized (linkedList) {
                    if (waitTime > 0L && this.pendingReceivedRequests == 0) {
                        try {
                            this.receivedAndDeletedBlockList.wait(waitTime);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)("BPOfferService for " + this + " interrupted"));
                        }
                    }
                }
            }
            catch (RemoteException re) {
                String reClass = re.getClassName();
                if (UnregisteredNodeException.class.getName().equals(reClass) || DisallowedDatanodeException.class.getName().equals(reClass) || IncorrectVersionException.class.getName().equals(reClass)) {
                    LOG.warn((Object)(this + " is shutting down"), (Throwable)re);
                    this.shouldServiceRun = false;
                    return;
                }
                LOG.warn((Object)"RemoteException in offerService", (Throwable)re);
                try {
                    long sleepTime = Math.min(1000L, this.dnConf.heartBeatInterval);
                    Thread.sleep(sleepTime);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            catch (IOException e) {
                LOG.warn((Object)"IOException in offerService", (Throwable)e);
            }
        }
    }

    void register() throws IOException {
        Preconditions.checkState((this.bpNSInfo != null ? 1 : 0) != 0, (Object)"register() should be called after handshake()");
        this.bpRegistration = this.dn.createBPRegistration(this.bpNSInfo);
        LOG.info((Object)(this + " beginning handshake with NN"));
        while (this.shouldRun()) {
            try {
                this.bpRegistration = this.bpNamenode.registerDatanode(this.bpRegistration);
                break;
            }
            catch (SocketTimeoutException e) {
                LOG.info((Object)("Problem connecting to server: " + this.nnAddr));
                this.sleepAndLogInterrupts(1000, "connecting to server");
            }
        }
        LOG.info((Object)("Block pool " + this + " successfully registered with NN"));
        this.dn.bpRegistrationSucceeded(this.bpRegistration, this.getBlockPoolId());
        this.scheduleBlockReport(this.dnConf.initialBlockReportDelay);
    }

    private void sleepAndLogInterrupts(int millis, String stateString) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException ie) {
            LOG.info((Object)("BPOfferService " + this + " interrupted while " + stateString));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info((Object)(this + " starting to offer service"));
        try {
            try {
                this.connectToNNAndHandshake();
            }
            catch (IOException ioe) {
                LOG.fatal((Object)("Initialization failed for block pool " + this), (Throwable)ioe);
                LOG.warn((Object)("Ending block pool service for: " + this));
                this.cleanUp();
                return;
            }
            this.initialized = true;
            while (this.shouldRun()) {
                try {
                    this.startDistributedUpgradeIfNeeded();
                    this.offerService();
                }
                catch (Exception ex) {
                    LOG.error((Object)("Exception in BPOfferService for " + this), (Throwable)ex);
                    this.sleepAndLogInterrupts(5000, "offering service");
                }
            }
        }
        catch (Throwable ex) {
            LOG.warn((Object)("Unexpected exception in block pool " + this), ex);
        }
        finally {
            LOG.warn((Object)("Ending block pool service for: " + this));
            this.cleanUp();
        }
    }

    private boolean shouldRun() {
        return this.shouldServiceRun && this.dn.shouldRun();
    }

    private boolean processCommand(DatanodeCommand[] cmds) {
        if (cmds != null) {
            for (DatanodeCommand cmd : cmds) {
                try {
                    if (!this.processCommand(cmd)) {
                        return false;
                    }
                }
                catch (IOException ioe) {
                    LOG.warn((Object)"Error processing datanode Command", (Throwable)ioe);
                }
            }
        }
        return true;
    }

    private boolean processCommand(DatanodeCommand cmd) throws IOException {
        if (cmd == null) {
            return true;
        }
        BlockCommand bcmd = cmd instanceof BlockCommand ? (BlockCommand)cmd : null;
        switch (cmd.getAction()) {
            case 1: {
                this.dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
                this.dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
                break;
            }
            case 2: {
                Block[] toDelete = bcmd.getBlocks();
                if (this.dn.blockScanner != null) {
                    this.dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
                }
                this.dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
                this.dn.metrics.incrBlocksRemoved(toDelete.length);
                break;
            }
            case 3: {
                this.shouldServiceRun = false;
                return false;
            }
            case 4: {
                LOG.info((Object)"DatanodeCommand action: DNA_REGISTER");
                if (!this.shouldRun()) break;
                this.retrieveNamespaceInfo();
                this.register();
                break;
            }
            case 5: {
                String bp = ((DatanodeCommand.Finalize)cmd).getBlockPoolId();
                assert (this.getBlockPoolId().equals(bp)) : "BP " + this.getBlockPoolId() + " received DNA_FINALIZE " + "for other block pool " + bp;
                this.dn.finalizeUpgradeForPool(bp);
                break;
            }
            case 101: {
                this.processDistributedUpgradeCommand((UpgradeCommand)cmd);
                break;
            }
            case 6: {
                this.dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
                break;
            }
            case 7: {
                LOG.info((Object)"DatanodeCommand action: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) break;
                this.dn.blockPoolTokenSecretManager.setKeys(this.getBlockPoolId(), ((KeyUpdateCommand)cmd).getExportedKeys());
                break;
            }
            case 8: {
                LOG.info((Object)"DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
                long bandwidth = ((BalancerBandwidthCommand)cmd).getBalancerBandwidthValue();
                if (bandwidth <= 0L) break;
                DataXceiverServer dxcs = (DataXceiverServer)this.dn.dataXceiverServer.getRunnable();
                dxcs.balanceThrottler.setBandwidth(bandwidth);
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + cmd.getAction()));
            }
        }
        return true;
    }

    private void processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
        UpgradeManagerDatanode upgradeManager = this.getUpgradeManager();
        upgradeManager.processUpgradeCommand(comm);
    }

    synchronized UpgradeManagerDatanode getUpgradeManager() {
        if (this.upgradeManager == null) {
            this.upgradeManager = new UpgradeManagerDatanode(this.dn, this.getBlockPoolId());
        }
        return this.upgradeManager;
    }

    private void startDistributedUpgradeIfNeeded() throws IOException {
        UpgradeManagerDatanode um = this.getUpgradeManager();
        if (!um.getUpgradeState()) {
            return;
        }
        um.setUpgradeState(false, um.getUpgradeVersion());
        um.startUpgrade();
    }

    @VisibleForTesting
    DatanodeProtocol getBpNamenode() {
        return this.bpNamenode;
    }

    @VisibleForTesting
    void setBpNamenode(DatanodeProtocol bpNamenode) {
        this.bpNamenode = bpNamenode;
    }
}

