/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.EditLogBackupInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Private
public class BackupImage
extends FSImage {
    private final EditLogBackupInputStream backupInputStream = new EditLogBackupInputStream("Data from remote NameNode");
    volatile BNState bnState;
    private boolean stopApplyingEditsOnNextRoll = false;
    private FSNamesystem namesystem;
    private int quotaInitThreads;

    BackupImage(Configuration conf) throws IOException {
        super(conf);
        this.storage.setDisablePreUpgradableLayoutCheck(true);
        this.bnState = BNState.DROP_UNTIL_NEXT_ROLL;
    }

    synchronized FSNamesystem getNamesystem() {
        return this.namesystem;
    }

    synchronized void setNamesystem(FSNamesystem fsn) {
        if (this.namesystem == null) {
            this.namesystem = fsn;
        }
    }

    void recoverCreateRead() throws IOException {
        Iterator<Storage.StorageDirectory> it = this.storage.dirIterator();
        while (it.hasNext()) {
            Storage.StorageDirectory sd = it.next();
            try {
                Storage.StorageState curState = sd.analyzeStorage(HdfsServerConstants.StartupOption.REGULAR, this.storage);
                switch (curState) {
                    case NON_EXISTENT: {
                        throw new InconsistentFSStateException(sd.getRoot(), "checkpoint directory does not exist or is not accessible.");
                    }
                    case NOT_FORMATTED: {
                        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
                        LOG.info("Formatting ...");
                        sd.clearDirectory();
                        break;
                    }
                    case NORMAL: {
                        break;
                    }
                    default: {
                        sd.doRecover(curState);
                    }
                }
                if (curState == Storage.StorageState.NOT_FORMATTED) continue;
                this.storage.readProperties(sd);
            }
            catch (IOException ioe) {
                sd.unlock();
                throw ioe;
            }
        }
    }

    synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got journal, state = " + (Object)((Object)this.bnState) + "; firstTxId = " + firstTxId + "; numTxns = " + numTxns);
        }
        switch (this.bnState) {
            case DROP_UNTIL_NEXT_ROLL: {
                return;
            }
            case IN_SYNC: {
                this.applyEdits(firstTxId, numTxns, data);
                break;
            }
            case JOURNAL_ONLY: {
                break;
            }
            default: {
                throw new AssertionError((Object)("Unhandled state: " + (Object)((Object)this.bnState)));
            }
        }
        this.editLog.journal(firstTxId, numTxns, data);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data) throws IOException {
        Preconditions.checkArgument((firstTxId == this.lastAppliedTxId + 1L ? 1 : 0) != 0, (String)"Received txn batch starting at %s but expected %s", (long)firstTxId, (long)(this.lastAppliedTxId + 1L));
        assert (this.backupInputStream.length() == 0L) : "backup input stream is not empty";
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace("data:" + StringUtils.byteToHexString((byte[])data));
            }
            FSEditLogLoader logLoader = new FSEditLogLoader(this.getNamesystem(), this.lastAppliedTxId);
            int logVersion = this.storage.getLayoutVersion();
            this.backupInputStream.setBytes(data, logVersion);
            long numTxnsAdvanced = logLoader.loadEditRecords(this.backupInputStream, true, this.lastAppliedTxId + 1L, null, null);
            if (numTxnsAdvanced != (long)numTxns) {
                throw new IOException("Batch of txns starting at txnid " + firstTxId + " was supposed to contain " + numTxns + " transactions, but we were only able to advance by " + numTxnsAdvanced);
            }
            this.lastAppliedTxId = logLoader.getLastAppliedTxId();
            this.getNamesystem().writeLock();
            try {
                this.getNamesystem().dir.updateCountForQuota();
            }
            finally {
                this.getNamesystem().writeUnlock();
            }
        }
        finally {
            this.backupInputStream.clear();
        }
    }

    void convergeJournalSpool() throws IOException {
        Preconditions.checkState((this.bnState == BNState.JOURNAL_ONLY ? 1 : 0) != 0, (String)"bad state: %s", (Object)((Object)this.bnState));
        while (!this.tryConvergeJournalSpool()) {
        }
        assert (this.bnState == BNState.IN_SYNC);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean tryConvergeJournalSpool() throws IOException {
        Iterator<EditLogInputStream> editStreamsAll;
        Preconditions.checkState((this.bnState == BNState.JOURNAL_ONLY ? 1 : 0) != 0, (String)"bad state: %s", (Object)((Object)this.bnState));
        while (this.lastAppliedTxId < this.editLog.getCurSegmentTxId() - 1L) {
            long target = this.editLog.getCurSegmentTxId();
            LOG.info("Loading edits into backupnode to try to catch up from txid " + this.lastAppliedTxId + " to " + target);
            FSImageTransactionalStorageInspector inspector = new FSImageTransactionalStorageInspector();
            this.storage.inspectStorageDirs(inspector);
            this.editLog.recoverUnclosedStreams();
            editStreamsAll = this.editLog.selectInputStreams(this.lastAppliedTxId, target - 1L);
            ArrayList editStreams = Lists.newArrayList();
            Iterator iterator = editStreamsAll.iterator();
            while (iterator.hasNext()) {
                EditLogInputStream s = (EditLogInputStream)iterator.next();
                if (s.getFirstTxId() == this.editLog.getCurSegmentTxId()) continue;
                editStreams.add(s);
            }
            this.loadEdits(editStreams, this.getNamesystem());
        }
        BackupImage backupImage = this;
        synchronized (backupImage) {
            EditLogInputStream s;
            if (this.lastAppliedTxId != this.editLog.getCurSegmentTxId() - 1L) {
                LOG.debug("Logs rolled while catching up to current segment");
                return false;
            }
            EditLogInputStream stream = null;
            Collection<EditLogInputStream> editStreams = this.getEditLog().selectInputStreams(this.getEditLog().getCurSegmentTxId(), this.getEditLog().getCurSegmentTxId());
            editStreamsAll = editStreams.iterator();
            if (editStreamsAll.hasNext() && (s = editStreamsAll.next()).getFirstTxId() == this.getEditLog().getCurSegmentTxId()) {
                stream = s;
            }
            if (stream == null) {
                LOG.warn("Unable to find stream starting with " + this.editLog.getCurSegmentTxId() + ". This indicates that there is an error in synchronization in BackupImage");
                return false;
            }
            try {
                long remainingTxns = this.getEditLog().getLastWrittenTxId() - this.lastAppliedTxId;
                LOG.info("Going to finish converging with remaining " + remainingTxns + " txns from in-progress stream " + stream);
                FSEditLogLoader loader = new FSEditLogLoader(this.getNamesystem(), this.lastAppliedTxId);
                loader.loadFSEdits(stream, this.lastAppliedTxId + 1L);
                this.lastAppliedTxId = loader.getLastAppliedTxId();
                assert (this.lastAppliedTxId == this.getEditLog().getLastWrittenTxId());
            }
            finally {
                FSEditLog.closeAllStreams(editStreams);
            }
            LOG.info("Successfully synced BackupNode with NameNode at txnid " + this.lastAppliedTxId);
            this.setState(BNState.IN_SYNC);
        }
        return true;
    }

    private synchronized void setState(BNState newState) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("State transition " + (Object)((Object)this.bnState) + " -> " + (Object)((Object)newState));
        }
        this.bnState = newState;
    }

    synchronized void namenodeStartedLogSegment(long txid) throws IOException {
        this.editLog.startLogSegment(txid, true, this.namesystem.getEffectiveLayoutVersion());
        if (this.bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
            this.setState(BNState.JOURNAL_ONLY);
        }
        if (this.stopApplyingEditsOnNextRoll) {
            if (this.bnState == BNState.IN_SYNC) {
                LOG.info("Stopped applying edits to prepare for checkpoint.");
                this.setState(BNState.JOURNAL_ONLY);
            }
            this.stopApplyingEditsOnNextRoll = false;
            this.notifyAll();
        }
    }

    synchronized void freezeNamespaceAtNextRoll() {
        this.stopApplyingEditsOnNextRoll = true;
    }

    synchronized void waitUntilNamespaceFrozen() throws IOException {
        if (this.bnState != BNState.IN_SYNC) {
            return;
        }
        LOG.info("Waiting until the NameNode rolls its edit logs in order to freeze the BackupNode namespace.");
        while (this.bnState == BNState.IN_SYNC) {
            Preconditions.checkState((boolean)this.stopApplyingEditsOnNextRoll, (Object)"If still in sync, we should still have the flag set to freeze at next roll");
            try {
                this.wait();
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted waiting for namespace to freeze", (Throwable)ie);
                throw new IOException(ie);
            }
        }
        LOG.info("BackupNode namespace frozen.");
    }

    @Override
    public synchronized void close() throws IOException {
        this.editLog.abortCurrentLogSegment();
        this.storage.close();
    }

    static enum BNState {
        DROP_UNTIL_NEXT_ROLL,
        JOURNAL_ONLY,
        IN_SYNC;

    }
}

