/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceWALReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class RecoveredReplicationSource
extends ReplicationSource {
    private static final Log LOG = LogFactory.getLog(RecoveredReplicationSource.class);
    private String actualPeerId;

    @Override
    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
        super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode, clusterId, replicationEndpoint, walFileLengthProvider, metrics);
        this.actualPeerId = this.replicationQueueInfo.getPeerId();
    }

    @Override
    protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
        RecoveredReplicationSourceShipper worker = new RecoveredReplicationSourceShipper(this.conf, walGroupId, queue, this, this.replicationQueues);
        ReplicationSourceShipper extant = this.workerThreads.putIfAbsent(walGroupId, worker);
        if (extant != null) {
            LOG.debug((Object)("Someone has beat us to start a worker thread for wal group " + walGroupId));
        } else {
            LOG.debug((Object)("Starting up worker for wal group " + walGroupId));
            worker.startup(this.getUncaughtExceptionHandler());
            worker.setWALReader(this.startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
            this.workerThreads.put(walGroupId, worker);
        }
    }

    @Override
    protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
        RecoveredReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(this.fs, this.conf, queue, startPosition, this.walEntryFilter, this);
        Threads.setDaemonThreadRunning(walReader, threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + this.peerClusterZnode, this.getUncaughtExceptionHandler());
        return walReader;
    }

    public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
        boolean hasPathChanged = false;
        PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(this.queueSizePerGroup, new ReplicationSource.LogsComparator());
        block0: for (Path path : queue) {
            if (this.fs.exists(path)) {
                newPaths.add(path);
                continue;
            }
            hasPathChanged = true;
            if (this.stopper instanceof ReplicationSyncUp.DummyServer) {
                Path newPath = this.getReplSyncUpPath(path);
                newPaths.add(newPath);
                continue;
            }
            List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
            LOG.info((Object)("NB dead servers : " + deadRegionServers.size()));
            Path walDir = FSUtils.getWALRootDir(this.conf);
            for (String curDeadServerName : deadRegionServers) {
                Path[] locs;
                Path deadRsDirectory = new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
                for (Path possibleLogLocation : locs = new Path[]{new Path(deadRsDirectory, path.getName()), new Path(deadRsDirectory.suffix("-splitting"), path.getName())}) {
                    LOG.info((Object)("Possible location " + possibleLogLocation.toUri().toString()));
                    if (!this.manager.getFs().exists(possibleLogLocation)) continue;
                    LOG.info((Object)("Log " + path + " still exists at " + possibleLogLocation));
                    newPaths.add(possibleLogLocation);
                    continue block0;
                }
            }
            LOG.error((Object)String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
            newPaths.add(path);
        }
        if (hasPathChanged) {
            if (newPaths.size() != queue.size()) {
                LOG.error((Object)"Recovery queue size is incorrect");
                throw new IOException("Recovery queue size error");
            }
            queue.clear();
            for (Path path : newPaths) {
                queue.add(path);
            }
        }
    }

    private Path getReplSyncUpPath(Path path) throws IOException {
        FileStatus[] rss;
        for (FileStatus rs : rss = this.fs.listStatus(this.manager.getLogDir())) {
            FileStatus[] logs;
            Path p = rs.getPath();
            for (FileStatus log : logs = this.fs.listStatus(p)) {
                if (!(p = new Path(p, log.getPath().getName())).getName().equals(path.getName())) continue;
                LOG.info((Object)("Log " + p.getName() + " found at " + p));
                return p;
            }
        }
        LOG.error((Object)("Didn't find path for: " + path.getName()));
        return path;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tryFinish() {
        ConcurrentHashMap concurrentHashMap = this.workerThreads;
        synchronized (concurrentHashMap) {
            Threads.sleep(100L);
            boolean allTasksDone = true;
            for (ReplicationSourceShipper worker : this.workerThreads.values()) {
                if (worker.isFinished()) continue;
                allTasksDone = false;
                break;
            }
            if (allTasksDone) {
                this.manager.closeRecoveredQueue(this);
                LOG.info((Object)("Finished recovering queue " + this.peerClusterZnode + " with the following stats: " + this.getStats()));
            }
        }
    }

    @Override
    public String getPeerId() {
        return this.actualPeerId;
    }
}

