/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.regionreplication;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationFlushRequester;
import org.apache.hadoop.hbase.shaded.com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.shaded.org.agrona.collections.IntHashSet;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class RegionReplicationSink {
    private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
    public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
    public static final int RETRIES_NUMBER_DEFAULT = 3;
    public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
    public static final long RPC_TIMEOUT_MS_DEFAULT = 1000L;
    public static final String OPERATION_TIMEOUT_MS = "hbase.region.read-replica.sink.operation.timeout.ms";
    public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000L;
    public static final String META_EDIT_RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
    public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000L;
    public static final String META_EDIT_OPERATION_TIMEOUT_MS = "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
    public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000L;
    public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
    public static final long BATCH_SIZE_CAPACITY_DEFAULT = 0x100000L;
    public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
    public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
    private final RegionInfo primary;
    private final TableDescriptor tableDesc;
    private final int regionReplication;
    private final RegionReplicationBufferManager manager;
    private final RegionReplicationFlushRequester flushRequester;
    private final AsyncClusterConnection conn;
    private final IntHashSet failedReplicas;
    private final Queue<SinkEntry> entries = new ArrayDeque<SinkEntry>();
    private final int retries;
    private final long rpcTimeoutNs;
    private final long operationTimeoutNs;
    private final long metaEditRpcTimeoutNs;
    private final long metaEditOperationTimeoutNs;
    private final long batchSizeCapacity;
    private final long batchCountCapacity;
    private volatile long pendingSize;
    private long lastFlushedSequenceId;
    private boolean sending;
    private boolean stopping;
    private boolean stopped;

    public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
        Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary", (Object)primary);
        this.regionReplication = td.getRegionReplication();
        Preconditions.checkArgument(this.regionReplication > 1, "region replication should be greater than 1 but got %s", this.regionReplication);
        this.primary = primary;
        this.tableDesc = td;
        this.manager = manager;
        this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester);
        this.conn = conn;
        this.retries = conf.getInt(RETRIES_NUMBER, 3);
        this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, 1000L));
        this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, 5000L));
        this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, 15000L));
        this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, 60000L));
        this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, 0x100000L);
        this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, 100);
        this.failedReplicas = new IntHashSet(this.regionReplication - 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) {
        long maxSequenceId = Long.MIN_VALUE;
        long toReleaseSize = 0L;
        for (SinkEntry entry : sent) {
            maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
            entry.replicated();
            toReleaseSize += entry.size;
        }
        this.manager.decrease(toReleaseSize);
        Queue<SinkEntry> queue = this.entries;
        synchronized (queue) {
            this.pendingSize -= toReleaseSize;
            boolean addFailedReplicas = false;
            for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
                Integer replicaId = entry.getKey();
                Throwable error = entry.getValue().getValue();
                if (error == null) continue;
                if (maxSequenceId > this.lastFlushedSequenceId) {
                    LOG.warn("Failed to replicate to secondary replica {} for {}, since the max sequence id of sunk entris is {}, which is greater than the last flush SN {}, we will stop replicating for a while and trigger a flush", new Object[]{replicaId, this.primary, maxSequenceId, this.lastFlushedSequenceId, error});
                    this.failedReplicas.add(replicaId);
                    addFailedReplicas = true;
                    continue;
                }
                LOG.warn("Failed to replicate to secondary replica {} for {}, since the max sequence id of sunk entris is {}, which is less than or equal to the last flush SN {}, we will not stop replicating", new Object[]{replicaId, this.primary, maxSequenceId, this.lastFlushedSequenceId, error});
            }
            if (addFailedReplicas) {
                this.flushRequester.requestFlush(maxSequenceId);
            }
            this.sending = false;
            if (this.stopping) {
                this.stopped = true;
                this.entries.notifyAll();
                return;
            }
            if (!this.entries.isEmpty()) {
                this.send();
            }
        }
    }

    private void send() {
        long operationTimeoutNsToUse;
        long rpcTimeoutNsToUse;
        SinkEntry entry;
        ArrayList<SinkEntry> toSend = new ArrayList<SinkEntry>();
        long totalSize = 0L;
        boolean hasMetaEdit = false;
        while ((entry = this.entries.poll()) != null) {
            toSend.add(entry);
            hasMetaEdit |= entry.edit.isMetaEdit();
            if ((long)toSend.size() < this.batchCountCapacity && (totalSize += entry.size) < this.batchSizeCapacity) continue;
        }
        int toSendReplicaCount = this.regionReplication - 1 - this.failedReplicas.size();
        if (toSendReplicaCount <= 0) {
            return;
        }
        if (!hasMetaEdit) {
            rpcTimeoutNsToUse = this.rpcTimeoutNs;
            operationTimeoutNsToUse = this.operationTimeoutNs;
        } else {
            rpcTimeoutNsToUse = this.metaEditRpcTimeoutNs;
            operationTimeoutNsToUse = this.metaEditOperationTimeoutNs;
        }
        this.sending = true;
        List<WAL.Entry> walEntries = toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
        AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
        HashMap replica2Error = new HashMap();
        for (int replicaId = 1; replicaId < this.regionReplication; ++replicaId) {
            if (this.failedReplicas.contains(replicaId)) continue;
            MutableObject error = new MutableObject();
            replica2Error.put(replicaId, error);
            RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(this.primary, replicaId);
            FutureUtils.addListener(this.conn.replicate(replica, walEntries, this.retries, rpcTimeoutNsToUse, operationTimeoutNsToUse), (r, e) -> {
                error.setValue(e);
                if (remaining.decrementAndGet() == 0) {
                    this.onComplete(toSend, replica2Error);
                }
            });
        }
    }

    private boolean isStartFlushAllStores(WALProtos.FlushDescriptor flushDesc) {
        if (flushDesc.getAction() == WALProtos.FlushDescriptor.FlushAction.CANNOT_FLUSH) {
            return true;
        }
        if (flushDesc.getAction() != WALProtos.FlushDescriptor.FlushAction.START_FLUSH) {
            return false;
        }
        Set storesFlushed = flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()).collect(Collectors.toCollection(() -> new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR)));
        if (storesFlushed.size() != this.tableDesc.getColumnFamilyCount()) {
            return false;
        }
        return storesFlushed.containsAll(this.tableDesc.getColumnFamilyNames());
    }

    Optional<WALProtos.FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
        WALProtos.FlushDescriptor flushDesc;
        if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
            return Optional.empty();
        }
        try {
            flushDesc = WALEdit.getFlushDescriptor(metaCell);
        }
        catch (IOException e) {
            LOG.warn("Failed to parse FlushDescriptor from {}", (Object)metaCell);
            return Optional.empty();
        }
        if (flushDesc != null && this.isStartFlushAllStores(flushDesc)) {
            return Optional.of(flushDesc);
        }
        return Optional.empty();
    }

    private long clearAllEntries() {
        long toClearSize = 0L;
        for (SinkEntry entry : this.entries) {
            toClearSize += entry.size;
            entry.replicated();
        }
        this.entries.clear();
        this.pendingSize -= toClearSize;
        this.manager.decrease(toClearSize);
        return toClearSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
        if (!this.tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
            return;
        }
        Queue<SinkEntry> queue = this.entries;
        synchronized (queue) {
            if (this.stopping) {
                return;
            }
            if (edit.isMetaEdit()) {
                for (Cell metaCell : edit.getCells()) {
                    this.getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
                        long flushSequenceNumber;
                        this.lastFlushedSequenceId = flushSequenceNumber = flushDesc.getFlushSequenceNumber();
                        long clearedCount = this.entries.size();
                        long clearedSize = this.clearAllEntries();
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Got a flush all request with sequence id {}, clear {} pending entries with size {}, clear failed replicas {}", new Object[]{flushSequenceNumber, clearedCount, StringUtils.TraditionalBinaryPrefix.long2String((long)clearedSize, (String)"", (int)1), this.failedReplicas});
                        }
                        this.failedReplicas.clear();
                        this.flushRequester.recordFlush(flushSequenceNumber);
                    });
                }
            }
            if (this.failedReplicas.size() == this.regionReplication - 1) {
                return;
            }
            SinkEntry entry = new SinkEntry(key, edit, rpcCall);
            this.entries.add(entry);
            this.pendingSize += entry.size;
            if (this.manager.increase(entry.size)) {
                if (!this.sending) {
                    this.send();
                }
            } else {
                this.clearAllEntries();
                for (int replicaId = 1; replicaId < this.regionReplication; ++replicaId) {
                    this.failedReplicas.add(replicaId);
                }
                this.flushRequester.requestFlush(entry.key.getSequenceId());
            }
        }
    }

    long pendingSize() {
        return this.pendingSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Queue<SinkEntry> queue = this.entries;
        synchronized (queue) {
            this.stopping = true;
            this.clearAllEntries();
            if (!this.sending) {
                this.stopped = true;
                this.entries.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitUntilStopped() throws InterruptedException {
        Queue<SinkEntry> queue = this.entries;
        synchronized (queue) {
            while (!this.stopped) {
                this.entries.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @RestrictedApi(explanation="Should only be called in tests", link="", allowedOnPath=".*/src/test/.*")
    IntHashSet getFailedReplicas() {
        Queue<SinkEntry> queue = this.entries;
        synchronized (queue) {
            return this.failedReplicas;
        }
    }

    private static final class SinkEntry {
        final WALKeyImpl key;
        final WALEdit edit;
        final ServerCall<?> rpcCall;
        final long size;

        SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
            this.key = key;
            this.edit = edit;
            this.rpcCall = rpcCall;
            this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
            if (rpcCall != null) {
                rpcCall.retainByWAL();
            }
        }

        void replicated() {
            if (this.rpcCall != null) {
                this.rpcCall.releaseByWAL();
            }
        }
    }
}

