/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceLogQueue;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryFilterRetryableException;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReplicationSourceWALReaderThread
extends Thread {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceWALReaderThread.class);
    private ReplicationSourceLogQueue logQueue;
    private FileSystem fs;
    private Configuration conf;
    private BlockingQueue<WALEntryBatch> entryBatchQueue;
    private long replicationBatchSizeCapacity;
    private int replicationBatchCountCapacity;
    private long lastReadPosition;
    private Path lastReadPath;
    private WALEntryFilter filter;
    private long sleepForRetries;
    private boolean isReaderRunning = true;
    private ReplicationQueueInfo replicationQueueInfo;
    private int maxRetriesMultiplier;
    private MetricsSource metrics;
    private AtomicLong totalBufferUsed;
    private long totalBufferQuota;
    private final String walGroupId;
    private ReplicationSource source;
    private ReplicationSourceManager manager;

    public ReplicationSourceWALReaderThread(ReplicationSourceManager manager, ReplicationQueueInfo replicationQueueInfo, ReplicationSourceLogQueue logQueue, long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics, ReplicationSource source, String walGroupId) {
        this.replicationQueueInfo = replicationQueueInfo;
        this.logQueue = logQueue;
        this.walGroupId = walGroupId;
        this.lastReadPath = logQueue.getQueue(walGroupId).peek();
        this.lastReadPosition = startPosition;
        this.fs = fs;
        this.conf = conf;
        this.filter = filter;
        this.replicationBatchSizeCapacity = this.conf.getLong("replication.source.size.capacity", 0x4000000L);
        this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
        int batchCount = conf.getInt("replication.source.nb.batches", 1);
        this.totalBufferUsed = manager.getTotalBufferUsed();
        this.totalBufferQuota = conf.getLong("replication.total.buffer.quota", 0x10000000L);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.metrics = metrics;
        this.entryBatchQueue = new LinkedBlockingQueue<WALEntryBatch>(batchCount);
        this.source = source;
        this.manager = manager;
        LOG.info((Object)("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode() + ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId() + " inited, replicationBatchSizeCapacity=" + this.replicationBatchSizeCapacity + ", replicationBatchCountCapacity=" + this.replicationBatchCountCapacity + ", replicationBatchQueueCapacity=" + batchCount));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int sleepMultiplier = 1;
        WALEntryBatch batch = null;
        WALEntryStream entryStream = new WALEntryStream(this.logQueue, this.fs, this.conf, this.lastReadPosition, this.metrics, this.walGroupId);
        try {
            while (this.isReaderRunning()) {
                try {
                    entryStream = new WALEntryStream(this.logQueue, this.fs, this.conf, this.lastReadPosition, this.metrics, this.walGroupId);
                    while (this.isReaderRunning()) {
                        if (!this.source.isPeerEnabled()) {
                            Threads.sleep((long)this.sleepForRetries);
                            continue;
                        }
                        if (!this.checkQuota()) continue;
                        batch = new WALEntryBatch(this.replicationBatchCountCapacity);
                        boolean hasNext = entryStream.hasNext();
                        while (hasNext) {
                            WALEdit edit;
                            WAL.Entry entry = entryStream.next();
                            if ((entry = this.filterEntry(entry)) != null && (edit = entry.getEdit()) != null && !edit.isEmpty()) {
                                long entrySize = this.getEntrySizeIncludeBulkLoad(entry);
                                long entrySizeExcludeBulkLoad = this.getEntrySizeExcludeBulkLoad(entry);
                                batch.addEntry(entry, entrySize);
                                this.updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
                                boolean totalBufferTooLarge = this.acquireBufferQuota(entrySizeExcludeBulkLoad);
                                if (totalBufferTooLarge || batch.getHeapSize() >= this.replicationBatchSizeCapacity || batch.getNbEntries() >= this.replicationBatchCountCapacity) break;
                            }
                            hasNext = entryStream.hasNext();
                        }
                        if (!this.updateBatchAndShippingQueue(entryStream, batch, hasNext, false)) continue;
                        sleepMultiplier = 1;
                    }
                }
                catch (IOException | WALEntryFilterRetryableException | WALEntryStream.WALEntryStreamRuntimeException e) {
                    if (this.handleEofException(e, entryStream, batch)) {
                        sleepMultiplier = 1;
                        continue;
                    }
                    if (sleepMultiplier < this.maxRetriesMultiplier) {
                        LOG.debug((Object)("Failed to read stream of replication entries or replication filter is recovering " + e));
                        ++sleepMultiplier;
                    } else {
                        LOG.error((Object)("Failed to read stream of replication entries or replication filter is recovering " + e));
                    }
                    Threads.sleep((long)(this.sleepForRetries * (long)sleepMultiplier));
                }
                catch (InterruptedException e) {
                    LOG.trace((Object)"Interrupted while sleeping between WAL reads");
                    Thread.currentThread().interrupt();
                }
                finally {
                    entryStream.close();
                }
            }
        }
        catch (IOException e) {
            if (sleepMultiplier < this.maxRetriesMultiplier) {
                LOG.debug((Object)("Failed to read stream of replication entries: " + e));
                ++sleepMultiplier;
            } else {
                LOG.error((Object)"Failed to read stream of replication entries", (Throwable)e);
            }
            Threads.sleep((long)(this.sleepForRetries * (long)sleepMultiplier));
        }
        catch (InterruptedException e) {
            LOG.trace((Object)"Interrupted while sleeping between WAL reads");
            Thread.currentThread().interrupt();
        }
    }

    private boolean updateBatchAndShippingQueue(WALEntryStream entryStream, WALEntryBatch batch, boolean hasMoreData, boolean isEOFException) throws InterruptedException, IOException {
        this.updateBatch(entryStream, batch, hasMoreData, isEOFException);
        boolean isDataQueued = false;
        if (this.isShippable(batch)) {
            isDataQueued = true;
            this.entryBatchQueue.put(batch);
            if (!batch.hasMoreEntries()) {
                LOG.debug((Object)"Stopping the reader after recovering the queue");
                this.setReaderRunning(false);
            }
        } else {
            Thread.sleep(this.sleepForRetries);
        }
        if (!isEOFException) {
            this.resetStream(entryStream);
        }
        return isDataQueued;
    }

    private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, boolean moreData, boolean isEOFException) {
        this.logMessage(batch);
        if (isEOFException) {
            batch.updatePosition(this.lastReadPath, this.lastReadPosition);
        } else {
            batch.updatePosition(entryStream.getCurrentPath(), entryStream.getPosition());
        }
        batch.setMoreEntries(!this.replicationQueueInfo.isQueueRecovered() || moreData);
    }

    private void logMessage(WALEntryBatch batch) {
        if (LOG.isTraceEnabled()) {
            if (batch.isEmpty()) {
                LOG.trace((Object)"Didn't read any new entries from WAL");
            } else {
                LOG.trace((Object)String.format("Read %s WAL entries eligible for replication", batch.getNbEntries()));
            }
        }
    }

    private boolean isShippable(WALEntryBatch batch) {
        return !batch.isEmpty() || this.checkIfWALRolled(batch) || !batch.hasMoreEntries();
    }

    private boolean checkIfWALRolled(WALEntryBatch batch) {
        return this.lastReadPath == null && batch.lastWalPath != null || this.lastReadPath != null && !this.lastReadPath.equals((Object)batch.lastWalPath);
    }

    private void resetStream(WALEntryStream stream) throws IOException {
        this.lastReadPosition = stream.getPosition();
        this.lastReadPath = stream.getCurrentPath();
        stream.reset();
    }

    private boolean handleEofException(Exception e, WALEntryStream entryStream, WALEntryBatch batch) throws InterruptedException {
        boolean isRecoveredSource = this.manager.getOldSources().contains(this.source);
        PriorityBlockingQueue<Path> queue = this.logQueue.getQueue(this.walGroupId);
        if (e.getCause() instanceof EOFException && (isRecoveredSource || queue.size() > 1) && this.conf.getBoolean("replication.source.eof.autorecovery", false)) {
            Path path = queue.peek();
            try {
                if (!this.fs.exists(path)) {
                    path = entryStream.getArchivedLog(path);
                }
                if (this.fs.getFileStatus(path).getLen() == 0L) {
                    LOG.warn((Object)("Forcing removal of 0 length log in queue: " + path));
                    this.lastReadPath = path;
                    this.logQueue.remove(this.walGroupId);
                    this.lastReadPosition = 0L;
                    boolean hasMoreData = !queue.isEmpty();
                    this.updateBatchAndShippingQueue(entryStream, batch, hasMoreData, true);
                    return true;
                }
            }
            catch (IOException ioe) {
                LOG.warn((Object)("Couldn't get file length information about log " + path), (Throwable)ioe);
            }
        }
        return false;
    }

    public Path getCurrentPath() {
        return this.logQueue.getQueue(this.walGroupId).peek();
    }

    private boolean checkQuota() {
        if (this.totalBufferUsed.get() > this.totalBufferQuota) {
            Threads.sleep((long)this.sleepForRetries);
            return false;
        }
        return true;
    }

    private WAL.Entry filterEntry(WAL.Entry entry) {
        WAL.Entry filtered = this.filter.filter(entry);
        if (entry != null && filtered == null) {
            this.metrics.incrLogEditsFiltered();
        }
        return filtered;
    }

    public WALEntryBatch take() throws InterruptedException {
        return this.entryBatchQueue.take();
    }

    public WALEntryBatch poll(long timeout) throws InterruptedException {
        return this.entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
    }

    private long getEntrySizeIncludeBulkLoad(WAL.Entry entry) {
        WALEdit edit = entry.getEdit();
        return this.getEntrySizeExcludeBulkLoad(entry) + (long)this.sizeOfStoreFilesIncludeBulkLoad(edit);
    }

    public long getEntrySizeExcludeBulkLoad(WAL.Entry entry) {
        WALEdit edit = entry.getEdit();
        WALKey key = entry.getKey();
        return edit.heapSize() + key.estimatedSerializedSizeOf();
    }

    private void updateBatchStats(WALEntryBatch batch, WAL.Entry entry, long entryPosition, long entrySize) {
        WALEdit edit = entry.getEdit();
        if (edit != null && !edit.isEmpty()) {
            batch.incrementHeapSize(entrySize);
            Pair<Integer, Integer> nbRowsAndHFiles = this.countDistinctRowKeysAndHFiles(edit);
            batch.incrementNbRowKeys((Integer)nbRowsAndHFiles.getFirst());
            batch.incrementNbHFiles((Integer)nbRowsAndHFiles.getSecond());
        }
        batch.lastWalPosition = entryPosition;
    }

    private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
        ArrayList<Cell> cells = edit.getCells();
        int distinctRowKeys = 1;
        int totalHFileEntries = 0;
        Cell lastCell = (Cell)cells.get(0);
        int totalCells = edit.size();
        for (int i = 0; i < totalCells; ++i) {
            if (CellUtil.matchingQualifier((Cell)((Cell)cells.get(i)), (byte[])WALEdit.BULK_LOAD)) {
                try {
                    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                    List stores = bld.getStoresList();
                    int totalStores = stores.size();
                    for (int j = 0; j < totalStores; ++j) {
                        totalHFileEntries += ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileList().size();
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to deserialize bulk load entry from wal edit. Then its hfiles count will not be added into metric.");
                }
            }
            if (!CellUtil.matchingRow((Cell)((Cell)cells.get(i)), (Cell)lastCell)) {
                ++distinctRowKeys;
            }
            lastCell = (Cell)cells.get(i);
        }
        Pair result = new Pair((Object)distinctRowKeys, (Object)totalHFileEntries);
        return result;
    }

    private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
        ArrayList<Cell> cells = edit.getCells();
        int totalStoreFilesSize = 0;
        int totalCells = edit.size();
        for (int i = 0; i < totalCells; ++i) {
            if (!CellUtil.matchingQualifier((Cell)((Cell)cells.get(i)), (byte[])WALEdit.BULK_LOAD)) continue;
            try {
                WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                List stores = bld.getStoresList();
                int totalStores = stores.size();
                for (int j = 0; j < totalStores; ++j) {
                    totalStoreFilesSize = (int)((long)totalStoreFilesSize + ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileSizeBytes());
                }
                continue;
            }
            catch (IOException e) {
                LOG.error((Object)"Failed to deserialize bulk load entry from wal edit. Size of HFiles part of cell will not be considered in replication request size calculation.", (Throwable)e);
            }
        }
        return totalStoreFilesSize;
    }

    private boolean acquireBufferQuota(long size) {
        return this.totalBufferUsed.addAndGet(size) >= this.totalBufferQuota;
    }

    public boolean isReaderRunning() {
        return this.isReaderRunning && !this.isInterrupted();
    }

    public void setReaderRunning(boolean readerRunning) {
        this.isReaderRunning = readerRunning;
    }

    public long getLastReadPosition() {
        return this.lastReadPosition;
    }

    static final class WALEntryBatch {
        private List<Pair<WAL.Entry, Long>> walEntriesWithSize;
        private Path lastWalPath;
        private long lastWalPosition = 0L;
        private int nbRowKeys = 0;
        private int nbHFiles = 0;
        private long heapSize = 0L;
        private boolean moreEntries = true;

        private WALEntryBatch(int maxNbEntries) {
            this.walEntriesWithSize = new ArrayList<Pair<WAL.Entry, Long>>(maxNbEntries);
        }

        public void addEntry(WAL.Entry entry, long entrySize) {
            this.walEntriesWithSize.add((Pair<WAL.Entry, Long>)new Pair((Object)entry, (Object)entrySize));
        }

        public List<WAL.Entry> getWalEntries() {
            ArrayList<WAL.Entry> entries = new ArrayList<WAL.Entry>(this.walEntriesWithSize.size());
            for (Pair<WAL.Entry, Long> pair : this.walEntriesWithSize) {
                entries.add((WAL.Entry)pair.getFirst());
            }
            return entries;
        }

        public List<Pair<WAL.Entry, Long>> getWalEntriesWithSize() {
            return this.walEntriesWithSize;
        }

        public Path getLastWalPath() {
            return this.lastWalPath;
        }

        public long getLastWalPosition() {
            return this.lastWalPosition;
        }

        public int getNbEntries() {
            return this.walEntriesWithSize.size();
        }

        public int getNbRowKeys() {
            return this.nbRowKeys;
        }

        public int getNbHFiles() {
            return this.nbHFiles;
        }

        public int getNbOperations() {
            return this.getNbRowKeys() + this.getNbHFiles();
        }

        public long getHeapSize() {
            return this.heapSize;
        }

        private void incrementNbRowKeys(int increment) {
            this.nbRowKeys += increment;
        }

        private void incrementNbHFiles(int increment) {
            this.nbHFiles += increment;
        }

        private void incrementHeapSize(long increment) {
            this.heapSize += increment;
        }

        public boolean isEmpty() {
            return this.walEntriesWithSize.isEmpty();
        }

        public void updatePosition(Path currentPath, long currentPosition) {
            this.lastWalPath = currentPath;
            this.lastWalPosition = currentPosition;
        }

        public boolean hasMoreEntries() {
            return this.moreEntries;
        }

        public void setMoreEntries(boolean moreEntries) {
            this.moreEntries = moreEntries;
        }
    }
}

