/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.shaded.com.google.common.cache.Cache;
import org.apache.hadoop.hbase.shaded.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.util.StringUtils;

@InterfaceAudience.Private
public class RegionReplicaReplicationEndpoint
extends HBaseReplicationEndpoint {
    private static final Log LOG = LogFactory.getLog(RegionReplicaReplicationEndpoint.class);
    private static String CLIENT_RETRIES_NUMBER = "hbase.region.replica.replication.client.retries.number";
    private Configuration conf;
    private ClusterConnection connection;
    private TableDescriptors tableDescriptors;
    private WALSplitter.PipelineController controller;
    private RegionReplicaOutputSink outputSink;
    private WALSplitter.EntryBuffers entryBuffers;
    private int numWriterThreads;
    private int operationTimeout;
    private ExecutorService pool;

    @Override
    public WALEntryFilter getWALEntryfilter() {
        WALEntryFilter superFilter = super.getWALEntryfilter();
        WALEntryFilter skipReplayedEditsFilter = this.getSkipReplayedEditsFilter();
        if (superFilter == null) {
            return skipReplayedEditsFilter;
        }
        if (skipReplayedEditsFilter == null) {
            return superFilter;
        }
        ArrayList<WALEntryFilter> filters = Lists.newArrayList();
        filters.add(superFilter);
        filters.add(skipReplayedEditsFilter);
        return new ChainWALEntryFilter(filters);
    }

    protected WALEntryFilter getSkipReplayedEditsFilter() {
        return new SkipReplayedEditsFilter();
    }

    @Override
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(context.getConfiguration());
        this.tableDescriptors = context.getTableDescriptors();
        int defaultNumRetries = this.conf.getInt("hbase.client.retries.number", 31);
        if (defaultNumRetries > 10) {
            int mult = this.conf.getInt("hbase.client.serverside.retries.multiplier", 10);
            defaultNumRetries /= mult;
        }
        this.conf.setInt("hbase.client.serverside.retries.multiplier", 1);
        int numRetries = this.conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
        this.conf.setInt("hbase.client.retries.number", numRetries);
        this.numWriterThreads = this.conf.getInt("hbase.region.replica.replication.writer.threads", 3);
        this.controller = new WALSplitter.PipelineController();
        this.entryBuffers = new WALSplitter.EntryBuffers(this.controller, this.conf.getInt("hbase.region.replica.replication.buffersize", 0x8000000));
        this.operationTimeout = this.conf.getInt("hbase.client.operation.timeout", 1200000);
    }

    @Override
    protected void doStart() {
        try {
            this.connection = (ClusterConnection)ConnectionFactory.createConnection(this.conf);
            this.pool = this.getDefaultThreadPool(this.conf);
            this.outputSink = new RegionReplicaOutputSink(this.controller, this.tableDescriptors, this.entryBuffers, this.connection, this.pool, this.numWriterThreads, this.operationTimeout);
            this.outputSink.startWriterThreads();
            super.doStart();
        }
        catch (IOException ex) {
            LOG.warn((Object)("Received exception while creating connection :" + ex));
            this.notifyFailed(ex);
        }
    }

    @Override
    protected void doStop() {
        if (this.outputSink != null) {
            try {
                this.outputSink.finishWritingAndClose();
            }
            catch (IOException ex) {
                LOG.warn((Object)"Got exception while trying to close OutputSink");
                LOG.warn((Object)ex);
            }
        }
        if (this.pool != null) {
            this.pool.shutdownNow();
            try {
                boolean shutdown = this.pool.awaitTermination(10000L, TimeUnit.MILLISECONDS);
                if (!shutdown) {
                    LOG.warn((Object)"Failed to shutdown the thread pool after 10 seconds");
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Got interrupted while waiting for the thread pool to shut down" + e));
            }
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            }
            catch (IOException ex) {
                LOG.warn((Object)("Got exception closing connection :" + ex));
            }
        }
        super.doStop();
    }

    private ExecutorService getDefaultThreadPool(Configuration conf) {
        int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
        int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
        if (maxThreads == 0) {
            maxThreads = Runtime.getRuntime().availableProcessors() * 8;
        }
        if (coreThreads == 0) {
            coreThreads = Runtime.getRuntime().availableProcessors() * 8;
        }
        long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60L);
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maxThreads * conf.getInt("hbase.client.max.total.tasks", 100));
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
        tpe.allowCoreThreadTimeOut(true);
        return tpe;
    }

    @Override
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        while (this.isRunning()) {
            try {
                for (WAL.Entry entry : replicateContext.getEntries()) {
                    this.entryBuffers.appendEntry(entry);
                }
                this.outputSink.flush();
                this.ctx.getMetrics().incrLogEditsFiltered(this.outputSink.getSkippedEditsCounter().getAndSet(0L));
                return true;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (IOException e) {
                LOG.warn((Object)("Received IOException while trying to replicate" + StringUtils.stringifyException(e)));
            }
        }
        return false;
    }

    @Override
    public boolean canReplicateToSameCluster() {
        return true;
    }

    @Override
    protected WALEntryFilter getScopeWALEntryFilter() {
        return null;
    }

    static class RegionReplicaReplayCallable
    extends RegionAdminServiceCallable<AdminProtos.ReplicateWALEntryResponse> {
        private final List<WAL.Entry> entries;
        private final byte[] initialEncodedRegionName;
        private final AtomicLong skippedEntries;

        public RegionReplicaReplayCallable(ClusterConnection connection, RpcControllerFactory rpcControllerFactory, TableName tableName, HRegionLocation location, HRegionInfo regionInfo, byte[] row, List<WAL.Entry> entries, AtomicLong skippedEntries) {
            super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
            this.entries = entries;
            this.skippedEntries = skippedEntries;
            this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
        }

        @Override
        public AdminProtos.ReplicateWALEntryResponse call(int timeout) throws IOException {
            return this.replayToServer(this.entries, timeout);
        }

        private AdminProtos.ReplicateWALEntryResponse replayToServer(List<WAL.Entry> entries, int timeout) throws IOException {
            boolean skip = false;
            if (!Bytes.equals(this.location.getRegionInfo().getEncodedNameAsBytes(), this.initialEncodedRegionName)) {
                skip = true;
            }
            if (!entries.isEmpty() && !skip) {
                WAL.Entry[] entriesArray = new WAL.Entry[entries.size()];
                entriesArray = entries.toArray(entriesArray);
                Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, this.location.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
                try {
                    PayloadCarryingRpcController controller = this.rpcControllerFactory.newController(p.getSecond());
                    controller.setCallTimeout(timeout);
                    controller.setPriority(this.tableName);
                    return this.stub.replay(controller, p.getFirst());
                }
                catch (ServiceException se) {
                    throw ProtobufUtil.getRemoteException(se);
                }
            }
            if (skip) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Skipping " + entries.size() + " entries in table " + this.tableName + " because located region " + this.location.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(this.initialEncodedRegionName) + " from WALEdit"));
                    for (WAL.Entry entry : entries) {
                        LOG.trace((Object)("Skipping : " + entry));
                    }
                }
                this.skippedEntries.addAndGet(entries.size());
            }
            return AdminProtos.ReplicateWALEntryResponse.newBuilder().build();
        }
    }

    static class RetryingRpcCallable<V>
    implements Callable<V> {
        RpcRetryingCallerFactory factory;
        RetryingCallable<V> callable;
        int timeout;

        public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, int timeout) {
            this.factory = factory;
            this.callable = callable;
            this.timeout = timeout;
        }

        @Override
        public V call() throws Exception {
            return (V)this.factory.newCaller().callWithRetries(this.callable, this.timeout);
        }
    }

    static class RegionReplicaSinkWriter
    extends WALSplitter.SinkWriter {
        RegionReplicaOutputSink sink;
        ClusterConnection connection;
        RpcControllerFactory rpcControllerFactory;
        RpcRetryingCallerFactory rpcRetryingCallerFactory;
        int operationTimeout;
        ExecutorService pool;
        Cache<TableName, Boolean> disabledAndDroppedTables;
        TableDescriptors tableDescriptors;

        public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
            this.sink = sink;
            this.connection = connection;
            this.operationTimeout = operationTimeout;
            this.rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
            this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
            this.pool = pool;
            this.tableDescriptors = tableDescriptors;
            int nonExistentTableCacheExpiryMs = connection.getConfiguration().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
            this.disabledAndDroppedTables = CacheBuilder.newBuilder().expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build();
        }

        public void append(TableName tableName, byte[] encodedRegionName, byte[] row, List<WAL.Entry> entries) throws IOException {
            RegionLocations locations;
            block24: {
                HRegionLocation primaryLocation;
                if (this.disabledAndDroppedTables.getIfPresent(tableName) != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)("Skipping " + entries.size() + " entries because table " + tableName + " is cached as a disabled or dropped table"));
                        for (WAL.Entry entry : entries) {
                            LOG.trace((Object)("Skipping : " + entry));
                        }
                    }
                    this.sink.getSkippedEditsCounter().addAndGet(entries.size());
                    return;
                }
                locations = null;
                boolean useCache = true;
                while (true) {
                    try {
                        locations = RegionReplicaReplayCallable.getRegionLocations(this.connection, tableName, row, useCache, 0);
                        if (locations == null) {
                            throw new HBaseIOException("Cannot locate locations for " + tableName + ", row:" + Bytes.toStringBinary(row));
                        }
                    }
                    catch (TableNotFoundException e) {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("Skipping " + entries.size() + " entries because table " + tableName + " is dropped. Adding table to cache."));
                            for (WAL.Entry entry : entries) {
                                LOG.trace((Object)("Skipping : " + entry));
                            }
                        }
                        this.disabledAndDroppedTables.put(tableName, Boolean.TRUE);
                        this.sink.getSkippedEditsCounter().addAndGet(entries.size());
                        return;
                    }
                    primaryLocation = locations.getDefaultRegionLocation();
                    if (Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) break block24;
                    if (!useCache) break;
                    useCache = false;
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Skipping " + entries.size() + " entries in table " + tableName + " because located region " + primaryLocation.getRegionInfo().getEncodedName() + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) + " from WALEdit"));
                    for (WAL.Entry entry : entries) {
                        LOG.trace((Object)("Skipping : " + entry));
                    }
                }
                this.sink.getSkippedEditsCounter().addAndGet(entries.size());
                return;
            }
            if (locations.size() == 1) {
                return;
            }
            ArrayList<Future<AdminProtos.ReplicateWALEntryResponse>> tasks = new ArrayList<Future<AdminProtos.ReplicateWALEntryResponse>>(locations.size() - 1);
            for (int replicaId = 0; replicaId < locations.size(); ++replicaId) {
                HRegionLocation location = locations.getRegionLocation(replicaId);
                if (RegionReplicaUtil.isDefaultReplica(replicaId)) continue;
                HRegionInfo regionInfo = location == null ? RegionReplicaUtil.getRegionInfoForReplica(locations.getDefaultRegionLocation().getRegionInfo(), replicaId) : location.getRegionInfo();
                RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(this.connection, this.rpcControllerFactory, tableName, location, regionInfo, row, entries, this.sink.getSkippedEditsCounter());
                Future<AdminProtos.ReplicateWALEntryResponse> task = this.pool.submit(new RetryingRpcCallable<AdminProtos.ReplicateWALEntryResponse>(this.rpcRetryingCallerFactory, callable, this.operationTimeout));
                tasks.add(task);
            }
            boolean tasksCancelled = false;
            for (int replicaId = 0; replicaId < tasks.size(); ++replicaId) {
                try {
                    ((Future)tasks.get(replicaId)).get();
                    continue;
                }
                catch (InterruptedException e) {
                    throw new InterruptedIOException(e.getMessage());
                }
                catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    boolean canBeSkipped = false;
                    if (cause instanceof IOException) {
                        HTableDescriptor tableDescriptor;
                        if (cause instanceof TableNotFoundException || this.connection.isTableDisabled(tableName)) {
                            this.disabledAndDroppedTables.put(tableName, Boolean.TRUE);
                            canBeSkipped = true;
                        } else if (this.tableDescriptors != null && (tableDescriptor = this.tableDescriptors.get(tableName)) != null && tableDescriptor.getRegionReplication() <= replicaId + 1) {
                            canBeSkipped = true;
                        }
                        if (canBeSkipped) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace((Object)("Skipping " + entries.size() + " entries in table " + tableName + " because received exception for dropped or disabled table"), cause);
                                for (WAL.Entry entry : entries) {
                                    LOG.trace((Object)("Skipping : " + entry));
                                }
                            }
                            if (tasksCancelled) continue;
                            this.sink.getSkippedEditsCounter().addAndGet(entries.size());
                            tasksCancelled = true;
                            continue;
                        }
                        throw (IOException)cause;
                    }
                    throw new IOException(cause);
                }
            }
        }
    }

    static class RegionReplicaOutputSink
    extends WALSplitter.OutputSink {
        private final RegionReplicaSinkWriter sinkWriter;
        private final TableDescriptors tableDescriptors;
        private final Cache<TableName, Boolean> memstoreReplicationEnabled;

        public RegionReplicaOutputSink(WALSplitter.PipelineController controller, TableDescriptors tableDescriptors, WALSplitter.EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) {
            super(controller, entryBuffers, numWriters);
            this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
            this.tableDescriptors = tableDescriptors;
            int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration().getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
            this.memstoreReplicationEnabled = CacheBuilder.newBuilder().expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10).maximumSize(1000L).build();
        }

        @Override
        public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
            List<WAL.Entry> entries = buffer.getEntryBuffer();
            if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
                return;
            }
            if (!this.requiresReplication(buffer.getTableName(), entries)) {
                return;
            }
            this.sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), entries.get(0).getEdit().getCells().get(0).getRow(), entries);
        }

        @Override
        public boolean flush() throws IOException {
            this.entryBuffers.waitUntilDrained();
            return super.flush();
        }

        @Override
        public boolean keepRegionEvent(WAL.Entry entry) {
            return true;
        }

        @Override
        public List<Path> finishWritingAndClose() throws IOException {
            this.finishWriting(true);
            return null;
        }

        @Override
        public Map<byte[], Long> getOutputCounts() {
            return null;
        }

        @Override
        public int getNumberOfRecoveredRegions() {
            return 0;
        }

        AtomicLong getSkippedEditsCounter() {
            return this.skippedEdits;
        }

        private boolean requiresReplication(TableName tableName, List<WAL.Entry> entries) throws IOException {
            if (this.tableDescriptors == null) {
                return true;
            }
            Boolean requiresReplication = this.memstoreReplicationEnabled.getIfPresent(tableName);
            if (requiresReplication == null) {
                HTableDescriptor htd = this.tableDescriptors.get(tableName);
                requiresReplication = htd == null || htd.hasRegionMemstoreReplication();
                this.memstoreReplicationEnabled.put(tableName, requiresReplication);
            }
            if (!requiresReplication.booleanValue()) {
                int skipEdits = 0;
                Iterator<WAL.Entry> it = entries.iterator();
                while (it.hasNext()) {
                    WAL.Entry entry = it.next();
                    if (entry.getEdit().isMetaEdit()) {
                        requiresReplication = true;
                        continue;
                    }
                    it.remove();
                    ++skipEdits;
                }
                this.skippedEdits.addAndGet(skipEdits);
            }
            return requiresReplication;
        }
    }

    private static class SkipReplayedEditsFilter
    extends BaseWALEntryFilter {
        private SkipReplayedEditsFilter() {
        }

        @Override
        public WAL.Entry filter(WAL.Entry entry) {
            if (entry.getKey().getOrigLogSeqNum() > 0L) {
                return null;
            }
            return entry;
        }
    }
}

