/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSink;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class Replication
extends WALActionsListener.Base
implements ReplicationSourceService,
ReplicationSinkService {
    private static final Log LOG = LogFactory.getLog(Replication.class);
    private boolean replicationForBulkLoadData;
    private ReplicationSourceManager replicationManager;
    private ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    private ReplicationTracker replicationTracker;
    private Configuration conf;
    private ReplicationSink replicationSink;
    private Server server;
    private ScheduledExecutorService scheduleThreadPool;
    private int statsThreadPeriod;
    private ReplicationLoad replicationLoad;

    public Replication(Server server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException {
        this.initialize(server, fs, logDir, oldLogDir, p -> OptionalLong.empty());
    }

    public Replication() {
    }

    @Override
    public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, WALFileLengthProvider walFileLengthProvider) throws IOException {
        this.server = server;
        this.conf = this.server.getConfiguration();
        this.replicationForBulkLoadData = Replication.isReplicationForBulkLoadDataEnabled(this.conf);
        this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d").setDaemon(true).build());
        if (this.replicationForBulkLoadData && (this.conf.get("hbase.replication.cluster.id") == null || this.conf.get("hbase.replication.cluster.id").isEmpty())) {
            throw new IllegalArgumentException("hbase.replication.cluster.id cannot be null/empty when hbase.replication.bulkload.enabled is set to true.");
        }
        try {
            this.replicationQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(this.conf, this.server, server.getZooKeeper()));
            this.replicationQueues.init(this.server.getServerName().toString());
            this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
            this.replicationPeers.init();
            this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, this.conf, this.server, this.server);
        }
        catch (Exception e) {
            throw new IOException("Failed replication handler create", e);
        }
        UUID clusterId = null;
        try {
            clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
        }
        catch (KeeperException ke) {
            throw new IOException("Could not read cluster id", ke);
        }
        this.replicationManager = new ReplicationSourceManager(this.replicationQueues, this.replicationPeers, this.replicationTracker, this.conf, this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
        this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 300);
        LOG.debug((Object)("ReplicationStatisticsThread " + this.statsThreadPeriod));
        this.replicationLoad = new ReplicationLoad();
    }

    public static boolean isReplicationForBulkLoadDataEnabled(Configuration c) {
        return c.getBoolean("hbase.replication.bulkload.enabled", false);
    }

    @Override
    public WALActionsListener getWALActionsListener() {
        return this;
    }

    @Override
    public void stopReplicationService() {
        this.join();
    }

    public void join() {
        this.replicationManager.join();
        if (this.replicationSink != null) {
            this.replicationSink.stopReplicationSinkServices();
        }
        this.scheduleThreadPool.shutdown();
    }

    @Override
    public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException {
        this.replicationSink.replicateEntries(entries, cells, replicationClusterId, sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
    }

    @Override
    public void startReplicationService() throws IOException {
        try {
            this.replicationManager.init();
        }
        catch (ReplicationException e) {
            throw new IOException(e);
        }
        this.replicationSink = new ReplicationSink(this.conf, this.server);
        this.scheduleThreadPool.scheduleAtFixedRate(new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), this.statsThreadPeriod, this.statsThreadPeriod, TimeUnit.SECONDS);
    }

    public ReplicationSourceManager getReplicationManager() {
        return this.replicationManager;
    }

    @Override
    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
        Replication.scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
    }

    public static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
        WALProtos.RegionEventDescriptor maybeEvent;
        boolean replicationForBulkLoadEnabled = Replication.isReplicationForBulkLoadDataEnabled(conf);
        boolean foundOtherEdits = false;
        for (Cell cell : logEdit.getCells()) {
            if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
            foundOtherEdits = true;
            break;
        }
        if (!foundOtherEdits && logEdit.getCells().size() > 0 && (maybeEvent = WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0))) != null && maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
            foundOtherEdits = true;
        }
        if (!replicationForBulkLoadEnabled && !foundOtherEdits || logEdit.isReplay()) {
            logKey.serializeReplicationScope(false);
        }
    }

    void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws IOException {
        try {
            this.replicationManager.addHFileRefs(tableName, family, pairs);
        }
        catch (ReplicationException e) {
            LOG.error((Object)"Failed to add hfile references in the replication queue.", (Throwable)e);
            throw new IOException(e);
        }
    }

    @Override
    public void preLogRoll(Path oldPath, Path newPath) throws IOException {
        this.getReplicationManager().preLogRoll(newPath);
    }

    @Override
    public void postLogRoll(Path oldPath, Path newPath) throws IOException {
        this.getReplicationManager().postLogRoll(newPath);
    }

    public static void decorateMasterConfiguration(Configuration conf) {
        String cleanerClass;
        String plugins = conf.get("hbase.master.logcleaner.plugins");
        if (!plugins.contains(cleanerClass = ReplicationLogCleaner.class.getCanonicalName())) {
            conf.set("hbase.master.logcleaner.plugins", plugins + "," + cleanerClass);
        }
        if (Replication.isReplicationForBulkLoadDataEnabled(conf) && !(plugins = conf.get("hbase.master.hfilecleaner.plugins")).contains(cleanerClass = ReplicationHFileCleaner.class.getCanonicalName())) {
            conf.set("hbase.master.hfilecleaner.plugins", plugins + "," + cleanerClass);
        }
    }

    public static void decorateRegionServerConfiguration(Configuration conf) {
        String rsCoprocessorClass;
        String plugins;
        if (Replication.isReplicationForBulkLoadDataEnabled(conf) && !(plugins = conf.get("hbase.coprocessor.regionserver.classes", "")).contains(rsCoprocessorClass = ReplicationObserver.class.getCanonicalName())) {
            conf.set("hbase.coprocessor.regionserver.classes", plugins + "," + rsCoprocessorClass);
        }
    }

    @Override
    public ReplicationLoad refreshAndGetReplicationLoad() {
        if (this.replicationLoad == null) {
            return null;
        }
        this.buildReplicationLoad();
        return this.replicationLoad;
    }

    private void buildReplicationLoad() {
        ArrayList<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
        List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
        for (ReplicationSourceInterface replicationSourceInterface : sources) {
            sourceMetricsList.add(replicationSourceInterface.getSourceMetrics());
        }
        List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
        for (ReplicationSourceInterface source : oldSources) {
            if (!(source instanceof ReplicationSource)) continue;
            sourceMetricsList.add(((ReplicationSource)source).getSourceMetrics());
        }
        MetricsSink metricsSink = this.replicationSink.getSinkMetrics();
        this.replicationLoad.buildReplicationLoad(sourceMetricsList, metricsSink);
    }

    static class ReplicationStatisticsThread
    extends Thread {
        private final ReplicationSink replicationSink;
        private final ReplicationSourceManager replicationManager;

        public ReplicationStatisticsThread(ReplicationSink replicationSink, ReplicationSourceManager replicationManager) {
            super("ReplicationStatisticsThread");
            this.replicationManager = replicationManager;
            this.replicationSink = replicationSink;
        }

        @Override
        public void run() {
            this.printStats(this.replicationManager.getStats());
            this.printStats(this.replicationSink.getStats());
        }

        private void printStats(String stats) {
            if (!stats.isEmpty()) {
                LOG.info((Object)stats);
            }
        }
    }
}

