/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hudi.org.apache.hadoop.hbase.Abortable;
import org.apache.hudi.org.apache.hadoop.hbase.CellUtil;
import org.apache.hudi.org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hudi.org.apache.hadoop.hbase.TableName;
import org.apache.hudi.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hudi.org.apache.hadoop.hbase.client.Admin;
import org.apache.hudi.org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hudi.org.apache.hadoop.hbase.client.Connection;
import org.apache.hudi.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hudi.org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hudi.org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hudi.org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hudi.org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hudi.org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hudi.org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager;
import org.apache.hudi.org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hudi.org.apache.hadoop.hbase.util.Threads;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WAL;
import org.apache.hudi.org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class HBaseInterClusterReplicationEndpoint
extends HBaseReplicationEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
    private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2L;
    public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = "hbase.replication.drop.on.deleted.table";
    public static final String REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY = "hbase.replication.drop.on.deleted.columnfamily";
    private ClusterConnection conn;
    private Configuration localConf;
    private Configuration conf;
    private long sleepForRetries;
    private int maxRetriesMultiplier;
    private int socketTimeoutMultiplier;
    private long maxTerminationWait;
    private int replicationRpcLimit;
    private MetricsSource metrics;
    private ReplicationSinkManager replicationSinkMgr;
    private boolean peersSelected = false;
    private String replicationClusterId = "";
    private ThreadPoolExecutor exec;
    private int maxThreads;
    private Path baseNamespaceDir;
    private Path hfileArchiveDir;
    private boolean replicationBulkLoadDataEnabled;
    private Abortable abortable;
    private boolean dropOnDeletedTables;
    private boolean dropOnDeletedColumnFamilies;
    private boolean isSerial = false;
    private long lastSinkFetchTime = 0L;

    protected Connection createConnection(Configuration conf) throws IOException {
        return ConnectionFactory.createConnection(conf);
    }

    protected ReplicationSinkManager createReplicationSinkManager(Connection conn) {
        return new ReplicationSinkManager((ClusterConnection)conn, this.ctx.getPeerId(), this, this.conf);
    }

    @Override
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        this.localConf = HBaseConfiguration.create(this.ctx.getLocalConfiguration());
        this.decorateConf();
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.maxRetriesMultiplier);
        long maxTerminationWaitMultiplier = this.conf.getLong("replication.source.maxterminationmultiplier", 2L);
        this.maxTerminationWait = maxTerminationWaitMultiplier * this.conf.getLong("hbase.rpc.timeout", 60000L);
        Connection connection = this.createConnection(this.conf);
        Preconditions.checkState(connection instanceof ClusterConnection);
        this.conn = (ClusterConnection)connection;
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.metrics = context.getMetrics();
        this.replicationSinkMgr = this.createReplicationSinkManager(this.conn);
        this.maxThreads = this.conf.getInt("hbase.replication.source.maxthreads", 10);
        this.exec = Threads.getBoundedCachedThreadPool(this.maxThreads, 60L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
        this.abortable = this.ctx.getAbortable();
        this.replicationRpcLimit = (int)(0.95 * (double)this.conf.getLong("hbase.ipc.max.request.size", 0x10000000L));
        this.dropOnDeletedTables = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
        this.dropOnDeletedColumnFamilies = this.conf.getBoolean(REPLICATION_DROP_ON_DELETED_COLUMN_FAMILY_KEY, false);
        this.replicationBulkLoadDataEnabled = this.conf.getBoolean("hbase.replication.bulkload.enabled", false);
        if (this.replicationBulkLoadDataEnabled) {
            this.replicationClusterId = this.conf.get("hbase.replication.cluster.id");
        }
        Path rootDir = CommonFSUtils.getRootDir(this.conf);
        Path baseNSDir = new Path("data");
        this.baseNamespaceDir = new Path(rootDir, baseNSDir);
        this.hfileArchiveDir = new Path(rootDir, new Path("archive", baseNSDir));
        this.isSerial = context.getPeerConfig().isSerial();
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty((CharSequence)replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    private void connectToPeers() {
        this.getRegionServers();
        int sleepMultiplier = 1;
        while (this.isRunning() && this.replicationSinkMgr.getNumSinks() == 0) {
            this.replicationSinkMgr.chooseSinks();
            if (!this.isRunning() || this.replicationSinkMgr.getNumSinks() != 0 || !this.sleepForRetries("Waiting for peers", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
    }

    private boolean sleepForRetries(String msg, int sleepMultiplier) {
        block3: {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} {}, sleeping {} times {}", this.logPeerId(), msg, this.sleepForRetries, sleepMultiplier);
                }
                Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (!LOG.isDebugEnabled()) break block3;
                LOG.debug("{} {} Interrupted while sleeping between retries", (Object)msg, (Object)this.logPeerId());
            }
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    private int getEstimatedEntrySize(WAL.Entry e) {
        long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
        return (int)size;
    }

    private List<List<WAL.Entry>> createParallelBatches(List<WAL.Entry> entries) {
        int numSinks = Math.max(this.replicationSinkMgr.getNumSinks(), 1);
        int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
        List<List<WAL.Entry>> entryLists = Stream.generate(ArrayList::new).limit(n).collect(Collectors.toList());
        int[] sizes = new int[n];
        for (WAL.Entry e : entries) {
            int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
            int entrySize = this.getEstimatedEntrySize(e);
            if (sizes[index] > 0 && sizes[index] + entrySize > this.replicationRpcLimit) {
                entryLists.add(entryLists.get(index));
                entryLists.set(index, new ArrayList());
                sizes[index] = 0;
            }
            entryLists.get(index).add(e);
            int n2 = index;
            sizes[n2] = sizes[n2] + entrySize;
        }
        return entryLists;
    }

    private List<List<WAL.Entry>> createSerialBatches(List<WAL.Entry> entries) {
        TreeMap<byte[], List> regionEntries = new TreeMap<byte[], List>(Bytes.BYTES_COMPARATOR);
        for (WAL.Entry e : entries) {
            regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList()).add(e);
        }
        return new ArrayList<List<WAL.Entry>>(regionEntries.values());
    }

    private List<List<WAL.Entry>> createBatches(List<WAL.Entry> entries) {
        if (this.isSerial) {
            return this.createSerialBatches(entries);
        }
        return this.createParallelBatches(entries);
    }

    public static boolean isTableNotFoundException(Throwable io) {
        if (io instanceof RemoteException) {
            io = ((RemoteException)io).unwrapRemoteException();
        }
        if (io != null && io.getMessage().contains("TableNotFoundException")) {
            return true;
        }
        while (io != null) {
            if (io instanceof TableNotFoundException) {
                return true;
            }
            io = io.getCause();
        }
        return false;
    }

    public static boolean isNoSuchColumnFamilyException(Throwable io) {
        if (io instanceof RemoteException) {
            io = ((RemoteException)io).unwrapRemoteException();
        }
        if (io != null && io.getMessage().contains("NoSuchColumnFamilyException")) {
            return true;
        }
        while (io != null) {
            if (io instanceof NoSuchColumnFamilyException) {
                return true;
            }
            io = io.getCause();
        }
        return false;
    }

    List<List<WAL.Entry>> filterNotExistTableEdits(List<List<WAL.Entry>> oldEntryList) {
        ArrayList<List<WAL.Entry>> entryList = new ArrayList<List<WAL.Entry>>();
        HashMap<TableName, Boolean> existMap = new HashMap<TableName, Boolean>();
        try (Connection localConn = ConnectionFactory.createConnection(this.ctx.getLocalConfiguration());
             Admin localAdmin = localConn.getAdmin();){
            for (List<WAL.Entry> oldEntries : oldEntryList) {
                ArrayList<WAL.Entry> entries = new ArrayList<WAL.Entry>();
                for (WAL.Entry e : oldEntries) {
                    TableName tableName = e.getKey().getTableName();
                    boolean exist = true;
                    if (existMap.containsKey(tableName)) {
                        exist = (Boolean)existMap.get(tableName);
                    } else {
                        try {
                            exist = localAdmin.tableExists(tableName);
                            existMap.put(tableName, exist);
                        }
                        catch (IOException iox) {
                            LOG.warn("Exception checking for local table " + tableName, iox);
                            exist = true;
                        }
                    }
                    if (exist) {
                        entries.add(e);
                        continue;
                    }
                    LOG.warn("Missing table detected at sink, local table also does not exist, filtering edits for table '{}'", (Object)tableName);
                }
                if (entries.isEmpty()) continue;
                entryList.add(entries);
            }
        }
        catch (IOException iox) {
            LOG.warn("Exception when creating connection to check local table", iox);
            return oldEntryList;
        }
        return entryList;
    }

    List<List<WAL.Entry>> filterNotExistColumnFamilyEdits(List<List<WAL.Entry>> oldEntryList) {
        ArrayList<List<WAL.Entry>> entryList = new ArrayList<List<WAL.Entry>>();
        HashMap existColumnFamilyMap = new HashMap();
        try (Connection localConn = ConnectionFactory.createConnection(this.ctx.getLocalConfiguration());
             Admin localAdmin = localConn.getAdmin();){
            for (List<WAL.Entry> oldEntries : oldEntryList) {
                ArrayList<WAL.Entry> entries = new ArrayList<WAL.Entry>();
                for (WAL.Entry e : oldEntries) {
                    TableName tableName = e.getKey().getTableName();
                    if (!existColumnFamilyMap.containsKey(tableName)) {
                        try {
                            Set cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream().map(Bytes::toString).collect(Collectors.toSet());
                            existColumnFamilyMap.put(tableName, cfs);
                        }
                        catch (Exception ex) {
                            LOG.warn("Exception getting cf names for local table {}", (Object)tableName, (Object)ex);
                            entries.add(e);
                            continue;
                        }
                    }
                    Set existColumnFamilies = (Set)existColumnFamilyMap.get(tableName);
                    HashSet missingCFs = new HashSet();
                    WALEdit walEdit = new WALEdit();
                    walEdit.getCells().addAll(e.getEdit().getCells());
                    WALUtil.filterCells(walEdit, cell -> {
                        String cf = Bytes.toString(CellUtil.cloneFamily(cell));
                        if (existColumnFamilies.contains(cf)) {
                            return cell;
                        }
                        missingCFs.add(cf);
                        return null;
                    });
                    if (!walEdit.isEmpty()) {
                        WAL.Entry newEntry = new WAL.Entry(e.getKey(), walEdit);
                        entries.add(newEntry);
                    }
                    if (missingCFs.isEmpty()) continue;
                    LOG.warn("Missing column family detected at sink, local column family also does not exist, filtering edits for table '{}',column family '{}'", (Object)tableName, (Object)missingCFs);
                }
                if (entries.isEmpty()) continue;
                entryList.add(entries);
            }
        }
        catch (IOException iox) {
            LOG.warn("Exception when creating connection to check local table", iox);
            return oldEntryList;
        }
        return entryList;
    }

    private void reconnectToPeerCluster() {
        ClusterConnection connection = null;
        try {
            connection = (ClusterConnection)ConnectionFactory.createConnection(this.conf);
        }
        catch (IOException ioe) {
            LOG.warn("{} Failed to create connection for peer cluster", (Object)this.logPeerId(), (Object)ioe);
        }
        if (connection != null) {
            this.conn = connection;
        }
    }

    private long parallelReplicate(CompletionService<Integer> pool, ReplicationEndpoint.ReplicateContext replicateContext, List<List<WAL.Entry>> batches) throws IOException {
        int futures = 0;
        for (int i = 0; i < batches.size(); ++i) {
            List<WAL.Entry> entries = batches.get(i);
            if (entries.isEmpty()) continue;
            if (LOG.isTraceEnabled()) {
                LOG.trace("{} Submitting {} entries of total size {}", this.logPeerId(), entries.size(), replicateContext.getSize());
            }
            pool.submit(this.createReplicator(entries, i, replicateContext.getTimeout()));
            ++futures;
        }
        IOException iox = null;
        long lastWriteTime = 0L;
        for (int i = 0; i < futures; ++i) {
            try {
                Future<Integer> f = pool.take();
                int index = f.get();
                List<WAL.Entry> batch = batches.get(index);
                batches.set(index, Collections.emptyList());
                long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
                if (writeTime <= lastWriteTime) continue;
                lastWriteTime = writeTime;
                continue;
            }
            catch (InterruptedException ie) {
                iox = new IOException(ie);
                continue;
            }
            catch (ExecutionException ee) {
                iox = ee.getCause() instanceof IOException ? (IOException)ee.getCause() : new IOException(ee.getCause());
            }
        }
        if (iox != null) {
            throw iox;
        }
        return lastWriteTime;
    }

    @Override
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        int numSinks;
        ExecutorCompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
        int sleepMultiplier = 1;
        if (!this.peersSelected && this.isRunning()) {
            this.connectToPeers();
            this.peersSelected = true;
        }
        if ((numSinks = this.replicationSinkMgr.getNumSinks()) == 0) {
            if (System.currentTimeMillis() - this.lastSinkFetchTime >= (long)(this.maxRetriesMultiplier * 1000)) {
                LOG.warn("No replication sinks found, returning without replicating. The source should retry with the same set of edits. Not logging this again for the next {} seconds.", (Object)this.maxRetriesMultiplier);
                this.lastSinkFetchTime = System.currentTimeMillis();
            }
            this.sleepForRetries("No sinks available at peer", sleepMultiplier);
            return false;
        }
        List<List<WAL.Entry>> batches = this.createBatches(replicateContext.getEntries());
        while (this.isRunning() && !this.exec.isShutdown()) {
            if (!this.isPeerEnabled()) {
                if (!this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            if (this.conn == null || this.conn.isClosed()) {
                this.reconnectToPeerCluster();
            }
            try {
                this.parallelReplicate(pool, replicateContext, batches);
                return true;
            }
            catch (IOException ioe) {
                if (ioe instanceof RemoteException) {
                    if (this.dropOnDeletedTables && HBaseInterClusterReplicationEndpoint.isTableNotFoundException(ioe)) {
                        if ((batches = this.filterNotExistTableEdits(batches)).isEmpty()) {
                            LOG.warn("After filter not exist table's edits, 0 edits to replicate, just return");
                            return true;
                        }
                    } else if (this.dropOnDeletedColumnFamilies && HBaseInterClusterReplicationEndpoint.isNoSuchColumnFamilyException(ioe)) {
                        if ((batches = this.filterNotExistColumnFamilyEdits(batches)).isEmpty()) {
                            LOG.warn("After filter not exist column family's edits, 0 edits to replicate, just return");
                            return true;
                        }
                    } else {
                        LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", (Object)this.logPeerId(), (Object)ioe);
                        this.replicationSinkMgr.chooseSinks();
                    }
                } else if (ioe instanceof SocketTimeoutException) {
                    this.sleepForRetries("Encountered a SocketTimeoutException. Since the call to the remote cluster timed out, which is usually caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier);
                } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
                    LOG.warn("{} Peer is unavailable, rechecking all sinks: ", (Object)this.logPeerId(), (Object)ioe);
                    this.replicationSinkMgr.chooseSinks();
                } else {
                    LOG.warn("{} Can't replicate because of a local or network error: ", (Object)this.logPeerId(), (Object)ioe);
                }
                if (!this.sleepForRetries("Since we are unable to replicate", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
        }
        return false;
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().isPeerEnabled();
    }

    @Override
    protected void doStop() {
        this.disconnect();
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            }
            catch (IOException e) {
                LOG.warn("{} Failed to close the connection", (Object)this.logPeerId());
            }
        }
        this.exec.shutdown();
        try {
            this.exec.awaitTermination(this.maxTerminationWait, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        if (!this.exec.isTerminated()) {
            String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The ThreadPoolExecutor failed to finish all tasks within " + this.maxTerminationWait + "ms. Aborting to prevent Replication from deadlocking. See HBASE-16081.";
            this.abortable.abort(errMsg, new IOException(errMsg));
        }
        this.notifyStopped();
    }

    protected int replicateEntries(List<WAL.Entry> entries, int batchIndex, int timeout) throws IOException {
        ReplicationSinkManager.SinkPeer sinkPeer = null;
        try {
            int entriesHashCode = System.identityHashCode(entries);
            if (LOG.isTraceEnabled()) {
                long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
                LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", this.logPeerId(), entriesHashCode, entries.size(), size, this.replicationClusterId);
            }
            sinkPeer = this.replicationSinkMgr.getReplicationSink();
            AdminProtos.AdminService.BlockingInterface rrs = sinkPeer.getRegionServer();
            try {
                ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new WAL.Entry[entries.size()]), this.replicationClusterId, this.baseNamespaceDir, this.hfileArchiveDir, timeout);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Completed replicating batch {}", (Object)this.logPeerId(), (Object)entriesHashCode);
                }
            }
            catch (IOException e) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Failed replicating batch {}", this.logPeerId(), entriesHashCode, e);
                }
                throw e;
            }
            this.replicationSinkMgr.reportSinkSuccess(sinkPeer);
        }
        catch (IOException ioe) {
            if (sinkPeer != null) {
                this.replicationSinkMgr.reportBadSink(sinkPeer);
            }
            throw ioe;
        }
        return batchIndex;
    }

    private int serialReplicateRegionEntries(List<WAL.Entry> entries, int batchIndex, int timeout) throws IOException {
        int batchSize = 0;
        int index = 0;
        ArrayList<WAL.Entry> batch = new ArrayList<WAL.Entry>();
        for (WAL.Entry entry : entries) {
            int entrySize = this.getEstimatedEntrySize(entry);
            if (batchSize > 0 && batchSize + entrySize > this.replicationRpcLimit) {
                this.replicateEntries(batch, index++, timeout);
                batch.clear();
                batchSize = 0;
            }
            batch.add(entry);
            batchSize += entrySize;
        }
        if (batchSize > 0) {
            this.replicateEntries(batch, index, timeout);
        }
        return batchIndex;
    }

    protected Callable<Integer> createReplicator(List<WAL.Entry> entries, int batchIndex, int timeout) {
        return this.isSerial ? () -> this.serialReplicateRegionEntries(entries, batchIndex, timeout) : () -> this.replicateEntries(entries, batchIndex, timeout);
    }

    private String logPeerId() {
        return "[Source for peer " + this.ctx.getPeerId() + "]:";
    }
}

