/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationThrottler;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ReplicationSource
extends Thread
implements ReplicationSourceInterface {
    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
    private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<String, PriorityBlockingQueue<Path>>();
    protected int queueSizePerGroup;
    protected ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    protected Configuration conf;
    protected ReplicationQueueInfo replicationQueueInfo;
    private String peerId;
    protected ReplicationSourceManager manager;
    protected Stoppable stopper;
    private long sleepForRetries;
    protected FileSystem fs;
    private UUID clusterId;
    private UUID peerClusterId;
    private AtomicLong totalReplicatedEdits = new AtomicLong(0L);
    protected String peerClusterZnode;
    private int maxRetriesMultiplier;
    private volatile boolean sourceRunning = false;
    private MetricsSource metrics;
    private int logQueueWarnThreshold;
    private ReplicationEndpoint replicationEndpoint;
    protected WALEntryFilter walEntryFilter;
    private ReplicationThrottler throttler;
    private long defaultBandwidth;
    private long currentBandwidth;
    private WALFileLengthProvider walFileLengthProvider;
    protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = new ConcurrentHashMap();
    private AtomicLong totalBufferUsed;
    public static final String WAIT_ON_ENDPOINT_SECONDS = "hbase.replication.wait.on.endpoint.seconds";
    public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
    private int waitOnEndpointSeconds = -1;

    @Override
    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
        this.stopper = stopper;
        this.conf = HBaseConfiguration.create(conf);
        this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, 30);
        this.decorateConf();
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.manager = manager;
        this.fs = fs;
        this.metrics = metrics;
        this.clusterId = clusterId;
        this.peerClusterZnode = peerClusterZnode;
        this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
        this.peerId = this.replicationQueueInfo.getPeerId();
        this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
        this.replicationEndpoint = replicationEndpoint;
        this.defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0L);
        this.currentBandwidth = this.getCurrentBandwidth();
        this.throttler = new ReplicationThrottler((double)this.currentBandwidth / 10.0);
        this.totalBufferUsed = manager.getTotalBufferUsed();
        this.walFileLengthProvider = walFileLengthProvider;
        LOG.info((Object)("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + this.peerId + ", currentBandwidth=" + this.currentBandwidth));
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    @Override
    public void enqueueLog(Path log) {
        String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName());
        PriorityBlockingQueue<Path> queue = this.queues.get(logPrefix);
        if (queue == null) {
            queue = new PriorityBlockingQueue<Path>(this.queueSizePerGroup, new LogsComparator());
            this.queues.put(logPrefix, queue);
            if (this.sourceRunning) {
                this.tryStartNewShipper(logPrefix, queue);
            }
        }
        queue.put(log);
        this.metrics.incrSizeOfLogQueue();
        int queueSize = queue.size();
        if (queueSize > this.logQueueWarnThreshold) {
            LOG.warn((Object)("WAL group " + logPrefix + " queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + this.logQueueWarnThreshold));
        }
    }

    @Override
    public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws ReplicationException {
        Map<TableName, List<String>> tableCFMap;
        String peerId = this.peerClusterZnode;
        if (peerId.contains("-")) {
            peerId = this.peerClusterZnode.split("-")[0];
        }
        if ((tableCFMap = this.replicationPeers.getConnectedPeer(peerId).getTableCFs()) != null) {
            List<String> tableCfs = tableCFMap.get(tableName);
            if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
                this.replicationQueues.addHFileRefs(peerId, pairs);
                this.metrics.incrSizeOfHFileRefsQueue(pairs.size());
            } else {
                LOG.debug((Object)("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(family) + " to peer id " + peerId));
            }
        } else {
            this.replicationQueues.addHFileRefs(peerId, pairs);
            this.metrics.incrSizeOfHFileRefsQueue(pairs.size());
        }
    }

    @Override
    public void run() {
        this.sourceRunning = true;
        try {
            this.replicationEndpoint.start();
            this.replicationEndpoint.awaitRunning(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            LOG.warn((Object)"Error starting ReplicationEndpoint, exiting", (Throwable)ex);
            this.uninitialize();
            throw new RuntimeException(ex);
        }
        int sleepMultiplier = 1;
        while (this.isSourceActive() && this.peerClusterId == null) {
            this.peerClusterId = this.replicationEndpoint.getPeerUUID();
            if (!this.isSourceActive() || this.peerClusterId != null || !this.sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
        if (this.clusterId.equals(this.peerClusterId) && !this.replicationEndpoint.canReplicateToSameCluster()) {
            this.terminate("ClusterId " + this.clusterId + " is replicating to itself: peerClusterId " + this.peerClusterId + " which is not allowed by ReplicationEndpoint:" + this.replicationEndpoint.getClass().getName(), null, false);
            this.manager.closeQueue(this);
            return;
        }
        LOG.info((Object)("Replicating " + this.clusterId + " -> " + this.peerClusterId));
        this.initializeWALEntryFilter();
        for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : this.queues.entrySet()) {
            String walGroupId = entry.getKey();
            PriorityBlockingQueue<Path> queue = entry.getValue();
            this.tryStartNewShipper(walGroupId, queue);
        }
    }

    private void initializeWALEntryFilter() {
        ArrayList<WALEntryFilter> filters = Lists.newArrayList(new SystemTableWALEntryFilter());
        WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
        if (filterFromEndpoint != null) {
            filters.add(filterFromEndpoint);
        }
        filters.add(new ClusterMarkingEntryFilter(this.clusterId, this.peerClusterId, this.replicationEndpoint));
        this.walEntryFilter = new ChainWALEntryFilter(filters);
    }

    protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
        ReplicationSourceShipper worker = new ReplicationSourceShipper(this.conf, walGroupId, queue, this);
        ReplicationSourceShipper extant = this.workerThreads.putIfAbsent(walGroupId, worker);
        if (extant != null) {
            LOG.debug((Object)("Someone has beat us to start a worker thread for wal group " + walGroupId));
        } else {
            LOG.debug((Object)("Starting up worker for wal group " + walGroupId));
            worker.startup(this.getUncaughtExceptionHandler());
            worker.setWALReader(this.startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
            this.workerThreads.put(walGroupId, worker);
        }
    }

    protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
        ReplicationSourceWALReader walReader = new ReplicationSourceWALReader(this.fs, this.conf, queue, startPosition, this.walEntryFilter, this);
        return Threads.setDaemonThreadRunning(walReader, threadName + ".replicationSource.wal-reader." + walGroupId + "," + this.peerClusterZnode, this.getUncaughtExceptionHandler());
    }

    @Override
    public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
        return new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                RSRpcServices.exitIfOOME(e);
                LOG.error((Object)("Unexpected exception in " + t.getName() + " currentPath=" + ReplicationSource.this.getCurrentPath()), e);
                ReplicationSource.this.stopper.stop("Unexpected exception in " + t.getName());
            }
        };
    }

    @Override
    public ReplicationEndpoint getReplicationEndpoint() {
        return this.replicationEndpoint;
    }

    @Override
    public ReplicationSourceManager getSourceManager() {
        return this.manager;
    }

    @Override
    public void tryThrottle(int batchSize) throws InterruptedException {
        long sleepTicks;
        this.checkBandwidthChangeAndResetThrottler();
        if (this.throttler.isEnabled() && (sleepTicks = this.throttler.getNextSleepInterval(batchSize)) > 0L) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("To sleep " + sleepTicks + "ms for throttling control"));
            }
            Thread.sleep(sleepTicks);
            this.throttler.resetStartTick();
        }
    }

    private void checkBandwidthChangeAndResetThrottler() {
        long peerBandwidth = this.getCurrentBandwidth();
        if (peerBandwidth != this.currentBandwidth) {
            this.currentBandwidth = peerBandwidth;
            this.throttler.setBandwidth((double)this.currentBandwidth / 10.0);
            LOG.info((Object)("ReplicationSource : " + this.peerId + " bandwidth throttling changed, currentBandWidth=" + this.currentBandwidth));
        }
    }

    private long getCurrentBandwidth() {
        ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(this.peerId);
        long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0L;
        return peerBandwidth != 0L ? peerBandwidth : this.defaultBandwidth;
    }

    private void uninitialize() {
        LOG.debug((Object)("Source exiting " + this.peerId));
        this.metrics.clear();
        if (this.replicationEndpoint.isRunning() || this.replicationEndpoint.isStarting()) {
            this.replicationEndpoint.stop();
            try {
                this.replicationEndpoint.awaitTerminated(this.waitOnEndpointSeconds, TimeUnit.SECONDS);
            }
            catch (TimeoutException e) {
                LOG.warn((Object)("Failed termination after " + this.waitOnEndpointSeconds + " seconds."));
            }
        }
    }

    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier));
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug((Object)"Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    @Override
    public boolean isPeerEnabled() {
        return this.replicationPeers.getStatusOfPeer(this.peerId);
    }

    @Override
    public void startup() {
        String n = Thread.currentThread().getName();
        Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOG.error((Object)"Unexpected exception in ReplicationSource", e);
            }
        };
        Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
    }

    @Override
    public void terminate(String reason) {
        this.terminate(reason, null);
    }

    @Override
    public void terminate(String reason, Exception cause) {
        this.terminate(reason, cause, true);
    }

    public void terminate(String reason, Exception cause, boolean join) {
        if (cause == null) {
            LOG.info((Object)("Closing source " + this.peerClusterZnode + " because: " + reason));
        } else {
            LOG.error((Object)("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason), (Throwable)cause);
        }
        this.sourceRunning = false;
        Collection<ReplicationSourceShipper> workers = this.workerThreads.values();
        for (ReplicationSourceShipper worker : workers) {
            worker.stopWorker();
            worker.entryReader.interrupt();
            worker.interrupt();
        }
        if (this.replicationEndpoint != null) {
            this.replicationEndpoint.stop();
        }
        if (join) {
            for (ReplicationSourceShipper worker : workers) {
                Threads.shutdown(worker, this.sleepForRetries);
                LOG.info((Object)("ReplicationSourceWorker " + worker.getName() + " terminated"));
            }
            if (this.replicationEndpoint != null) {
                try {
                    this.replicationEndpoint.awaitTerminated(this.sleepForRetries * (long)this.maxRetriesMultiplier, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    LOG.warn((Object)("Got exception while waiting for endpoint to shutdown for replication source :" + this.peerClusterZnode), (Throwable)te);
                }
            }
        }
    }

    @Override
    public String getPeerClusterZnode() {
        return this.peerClusterZnode;
    }

    @Override
    public String getPeerId() {
        return this.peerId;
    }

    @Override
    public Path getCurrentPath() {
        for (ReplicationSourceShipper worker : this.workerThreads.values()) {
            if (worker.getCurrentPath() == null) continue;
            return worker.getCurrentPath();
        }
        return null;
    }

    @Override
    public boolean isSourceActive() {
        return !this.stopper.isStopped() && this.sourceRunning;
    }

    @Override
    public String getStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("Total replicated edits: ").append(this.totalReplicatedEdits).append(", current progress: \n");
        for (Map.Entry<String, ReplicationSourceShipper> entry : this.workerThreads.entrySet()) {
            String walGroupId = entry.getKey();
            ReplicationSourceShipper worker = entry.getValue();
            long position = worker.getCurrentPosition();
            Path currentPath = worker.getCurrentPath();
            sb.append("walGroup [").append(walGroupId).append("]: ");
            if (currentPath != null) {
                sb.append("currently replicating from: ").append(currentPath).append(" at position: ").append(position).append("\n");
                continue;
            }
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    @Override
    public MetricsSource getSourceMetrics() {
        return this.metrics;
    }

    @Override
    public void postShipEdits(List<WAL.Entry> entries, int batchSize) {
        if (this.throttler.isEnabled()) {
            this.throttler.addPushSize(batchSize);
        }
        this.totalReplicatedEdits.addAndGet(entries.size());
        this.totalBufferUsed.addAndGet(-batchSize);
    }

    @Override
    public WALFileLengthProvider getWALFileLengthProvider() {
        return this.walFileLengthProvider;
    }

    public static class LogsComparator
    implements Comparator<Path> {
        @Override
        public int compare(Path o1, Path o2) {
            return Long.compare(LogsComparator.getTS(o1), LogsComparator.getTS(o2));
        }

        private static long getTS(Path p) {
            int tsIndex = p.getName().lastIndexOf(46) + 1;
            return Long.parseLong(p.getName().substring(tsIndex));
        }
    }
}

