/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class RecoveredReplicationSourceShipper
extends ReplicationSourceShipper {
    private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipper.class);
    protected final RecoveredReplicationSource source;
    private final ReplicationQueues replicationQueues;

    public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source, ReplicationQueues replicationQueues) {
        super(conf, walGroupId, queue, source);
        this.source = source;
        this.replicationQueues = replicationQueues;
    }

    @Override
    public void run() {
        this.setWorkerState(ReplicationSourceShipper.WorkerState.RUNNING);
        while (this.isActive()) {
            int sleepMultiplier = 1;
            if (!this.source.isPeerEnabled()) {
                if (!this.source.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            while (this.entryReader == null) {
                if (!this.source.sleepForRetries("Replication WAL entry reader thread not initialized", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
            try {
                ReplicationSourceWALReader.WALEntryBatch entryBatch = this.entryReader.take();
                this.shipEdits(entryBatch);
                if (!entryBatch.getWalEntries().isEmpty() || !entryBatch.getLastSeqIds().isEmpty()) continue;
                LOG.debug((Object)("Finished recovering queue for group " + this.walGroupId + " of peer " + this.source.getPeerClusterZnode()));
                this.source.getSourceMetrics().incrCompletedRecoveryQueue();
                this.setWorkerState(ReplicationSourceShipper.WorkerState.FINISHED);
            }
            catch (InterruptedException e) {
                LOG.trace((Object)"Interrupted while waiting for next replication entry batch", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        this.source.tryFinish();
        if (!this.isFinished()) {
            this.setWorkerState(ReplicationSourceShipper.WorkerState.STOPPED);
        }
    }

    @Override
    public long getStartPosition() {
        long startPosition = this.getRecoveredQueueStartPos();
        for (int numRetries = 0; numRetries <= this.maxRetriesMultiplier; ++numRetries) {
            try {
                this.source.locateRecoveredPaths(this.queue);
                break;
            }
            catch (IOException e) {
                LOG.error((Object)("Error while locating recovered queue paths, attempt #" + numRetries));
                continue;
            }
        }
        return startPosition;
    }

    private long getRecoveredQueueStartPos() {
        long startPosition = 0L;
        String peerClusterZnode = this.source.getPeerClusterZnode();
        try {
            startPosition = this.replicationQueues.getLogPosition(peerClusterZnode, ((Path)this.queue.peek()).getName());
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Recovered queue started with log " + this.queue.peek() + " at position " + startPosition));
            }
        }
        catch (ReplicationException e) {
            this.terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
        }
        return startPosition;
    }

    @Override
    protected void updateLogPosition(long lastReadPosition) {
        this.source.getSourceManager().logPositionAndCleanOldLogs(this.currentPath, this.source.getPeerClusterZnode(), lastReadPosition, true, false);
        this.lastLoggedPosition = lastReadPosition;
    }

    private void terminate(String reason, Exception cause) {
        if (cause == null) {
            LOG.info((Object)("Closing worker for wal group " + this.walGroupId + " because: " + reason));
        } else {
            LOG.error((Object)("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason), (Throwable)cause);
        }
        this.entryReader.interrupt();
        Threads.shutdown(this.entryReader, this.sleepForRetries);
        this.interrupt();
        Threads.shutdown(this, this.sleepForRetries);
        LOG.info((Object)("ReplicationSourceWorker " + this.getName() + " terminated"));
    }
}

