/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALEntryStream
implements Iterator<WAL.Entry>,
Closeable,
Iterable<WAL.Entry> {
    private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
    private WAL.Reader reader;
    private Path currentPath;
    private WAL.Entry currentEntry;
    private long currentPosition = 0L;
    private PriorityBlockingQueue<Path> logQueue;
    private FileSystem fs;
    private Configuration conf;
    private MetricsSource metrics;

    public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf, MetricsSource metrics) throws IOException {
        this(logQueue, fs, conf, 0L, metrics);
    }

    public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf, long startPosition, MetricsSource metrics) throws IOException {
        this.logQueue = logQueue;
        this.fs = fs;
        this.conf = conf;
        this.currentPosition = startPosition;
        this.metrics = metrics;
    }

    @Override
    public boolean hasNext() {
        if (this.currentEntry == null) {
            try {
                this.tryAdvanceEntry();
            }
            catch (Exception e) {
                throw new WALEntryStreamRuntimeException(e);
            }
        }
        return this.currentEntry != null;
    }

    @Override
    public WAL.Entry next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        WAL.Entry save = this.currentEntry;
        this.currentEntry = null;
        return save;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void close() throws IOException {
        this.closeReader();
    }

    @Override
    public Iterator<WAL.Entry> iterator() {
        return this;
    }

    public long getPosition() {
        return this.currentPosition;
    }

    public Path getCurrentPath() {
        return this.currentPath;
    }

    private String getCurrentPathStat() {
        StringBuilder sb = new StringBuilder();
        if (this.currentPath != null) {
            sb.append("currently replicating from: ").append(this.currentPath).append(" at position: ").append(this.currentPosition).append("\n");
        } else {
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    public void reset() throws IOException {
        if (this.reader != null && this.currentPath != null) {
            this.resetReader();
        }
    }

    private void setPosition(long position) {
        this.currentPosition = position;
    }

    private void setCurrentPath(Path path) {
        this.currentPath = path;
    }

    private void tryAdvanceEntry() throws IOException {
        if (this.checkReader()) {
            this.readNextEntryAndSetPosition();
            if (this.currentEntry == null && this.logQueue.size() > 1) {
                this.resetReader();
                this.readNextEntryAndSetPosition();
                if (this.currentEntry == null && this.checkAllBytesParsed()) {
                    this.dequeueCurrentLog();
                    if (this.openNextLog()) {
                        this.readNextEntryAndSetPosition();
                    }
                }
            }
        }
    }

    private boolean checkAllBytesParsed() throws IOException {
        long trailerSize = this.currentTrailerSize();
        FileStatus stat = null;
        try {
            stat = this.fs.getFileStatus(this.currentPath);
        }
        catch (IOException exception) {
            LOG.warn((Object)("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0L ? "was not" : "was") + " closed cleanly " + this.getCurrentPathStat()));
            this.metrics.incrUnknownFileLengthForClosedWAL();
        }
        if (stat != null) {
            if (trailerSize < 0L) {
                if (this.currentPosition < stat.getLen()) {
                    long skippedBytes = stat.getLen() - this.currentPosition;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Reached the end of WAL file '" + this.currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data. This is normally ok."));
                    }
                    this.metrics.incrUncleanlyClosedWALs();
                    this.metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
                }
            } else if (this.currentPosition + trailerSize < stat.getLen()) {
                LOG.warn((Object)("Processing end of WAL file '" + this.currentPath + "'. At position " + this.currentPosition + ", which is too far away from reported file length " + stat.getLen() + ". Restarting WAL reading (see HBASE-15983 for details). " + this.getCurrentPathStat()));
                this.setPosition(0L);
                this.resetReader();
                this.metrics.incrRestartedWALReading();
                this.metrics.incrRepeatedFileBytes(this.currentPosition);
                return false;
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Reached the end of log " + this.currentPath + ", and the length of the file is " + (stat == null ? "N/A" : Long.valueOf(stat.getLen()))));
        }
        this.metrics.incrCompletedWAL();
        return true;
    }

    private void dequeueCurrentLog() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Reached the end of log " + this.currentPath));
        }
        this.closeReader();
        this.logQueue.remove();
        this.setPosition(0L);
        this.metrics.decrSizeOfLogQueue();
    }

    private void readNextEntryAndSetPosition() throws IOException {
        WAL.Entry readEntry = this.reader.next();
        long readerPos = this.reader.getPosition();
        if (readEntry != null) {
            this.metrics.incrLogEditsRead();
            this.metrics.incrLogReadInBytes(readerPos - this.currentPosition);
        }
        this.currentEntry = readEntry;
        this.setPosition(readerPos);
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    private boolean checkReader() throws IOException {
        if (this.reader == null) {
            return this.openNextLog();
        }
        return true;
    }

    private boolean openNextLog() throws IOException {
        Path nextPath = this.logQueue.peek();
        if (nextPath != null) {
            this.openReader(nextPath);
            if (this.reader != null) {
                return true;
            }
        }
        return false;
    }

    private Path getArchivedLog(Path path) throws IOException {
        Path rootDir = FSUtils.getRootDir(this.conf);
        Path oldLogDir = new Path(rootDir, "oldWALs");
        Path archivedLogLocation = new Path(oldLogDir, path.getName());
        if (this.fs.exists(archivedLogLocation)) {
            LOG.info((Object)("Log " + path + " was moved to " + archivedLogLocation));
            return archivedLogLocation;
        }
        LOG.error((Object)("Couldn't locate log: " + path));
        return path;
    }

    private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
        Path archivedLog = this.getArchivedLog(path);
        if (path.equals(archivedLog)) {
            throw fnfe;
        }
        this.openReader(archivedLog);
    }

    private void openReader(Path path) throws IOException {
        try {
            if (this.reader == null || !this.getCurrentPath().equals(path)) {
                this.closeReader();
                this.reader = WALFactory.createReader(this.fs, path, this.conf);
                this.seek();
                this.setCurrentPath(path);
            } else {
                this.resetReader();
            }
        }
        catch (FileNotFoundException fnfe) {
            this.handleFileNotFound(path, fnfe);
        }
        catch (RemoteException re) {
            IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
            if (!(ioe instanceof FileNotFoundException)) {
                throw ioe;
            }
            this.handleFileNotFound(path, (FileNotFoundException)ioe);
        }
        catch (LeaseNotRecoveredException lnre) {
            LOG.warn((Object)("Try to recover the WAL lease " + this.currentPath), (Throwable)lnre);
            this.recoverLease(this.conf, this.currentPath);
            this.reader = null;
        }
        catch (NullPointerException npe) {
            LOG.warn((Object)"Got NPE opening reader, will retry.");
            this.reader = null;
        }
    }

    private void recoverLease(Configuration conf, final Path path) {
        try {
            FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
            FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
            fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable(){

                @Override
                public boolean progress() {
                    LOG.debug((Object)("recover WAL lease: " + path));
                    return true;
                }
            });
        }
        catch (IOException e) {
            LOG.warn((Object)("unable to recover lease for WAL: " + path), (Throwable)e);
        }
    }

    private void resetReader() throws IOException {
        try {
            this.reader.reset();
            this.seek();
        }
        catch (FileNotFoundException fnfe) {
            Path archivedLog = this.getArchivedLog(this.currentPath);
            if (!this.currentPath.equals(archivedLog)) {
                this.openReader(archivedLog);
            }
            throw fnfe;
        }
        catch (NullPointerException npe) {
            throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
        }
    }

    private void seek() throws IOException {
        if (this.currentPosition != 0L) {
            this.reader.seek(this.currentPosition);
        }
    }

    private long currentTrailerSize() {
        long size = -1L;
        if (this.reader instanceof ProtobufLogReader) {
            ProtobufLogReader pblr = (ProtobufLogReader)this.reader;
            size = pblr.trailerSize();
        }
        return size;
    }

    @InterfaceAudience.Private
    public static class WALEntryStreamRuntimeException
    extends RuntimeException {
        private static final long serialVersionUID = -6298201811259982568L;

        public WALEntryStreamRuntimeException(Exception e) {
            super(e);
        }
    }
}

