/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class TableReplicationQueueStorage
implements ReplicationQueueStorage {
    public static final byte[] QUEUE_FAMILY = Bytes.toBytes("queue");
    public static final byte[] LAST_SEQUENCE_ID_FAMILY = Bytes.toBytes("sid");
    public static final byte[] HFILE_REF_FAMILY = Bytes.toBytes("hfileref");
    private final Connection conn;
    private final TableName tableName;

    public TableReplicationQueueStorage(Connection conn, TableName tableName) {
        this.conn = conn;
        this.tableName = tableName;
    }

    private void addLastSeqIdsPut(MultiRowMutationProtos.MutateRowsRequest.Builder builder, String peerId, Map<String, Long> lastSeqIds, AsyncTable<?> table) throws IOException {
        byte[] row = Bytes.toBytes(peerId);
        Get get = new Get(row);
        lastSeqIds.keySet().forEach(encodedRegionName -> get.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(encodedRegionName)));
        Result result = FutureUtils.get(table.get(get));
        Put put = new Put(row);
        for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
            String encodedRegionName2 = entry.getKey();
            long lastSeqId = entry.getValue();
            byte[] encodedRegionNameAsBytes = Bytes.toBytes(encodedRegionName2);
            byte[] previousLastSeqIdAsBytes = result.getValue(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes);
            if (previousLastSeqIdAsBytes != null) {
                long previousLastSeqId = Bytes.toLong(previousLastSeqIdAsBytes);
                if (lastSeqId <= previousLastSeqId) continue;
                put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, Bytes.toBytes(lastSeqId));
                builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, CompareOperator.EQUAL, previousLastSeqIdAsBytes, null));
                continue;
            }
            put.addColumn(LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, Bytes.toBytes(lastSeqId));
            builder.addCondition(ProtobufUtil.toCondition(row, LAST_SEQUENCE_ID_FAMILY, encodedRegionNameAsBytes, CompareOperator.EQUAL, null, null));
        }
        if (!put.isEmpty()) {
            builder.addMutationRequest(ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put));
        }
    }

    @Override
    public void setOffset(ReplicationQueueId queueId, String walGroup, ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
        block4: {
            Put put = new Put(Bytes.toBytes(queueId.toString())).addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
            AsyncTable<AdvancedScanResultConsumer> asyncTable = this.conn.toAsyncConnection().getTable(this.tableName);
            try {
                block5: {
                    MultiRowMutationProtos.MutateRowsRequest.Builder builder;
                    MultiRowMutationProtos.MutateRowsRequest request;
                    MultiRowMutationProtos.MutateRowsResponse responose;
                    if (lastSeqIds.isEmpty()) {
                        FutureUtils.get(asyncTable.put(put));
                        break block4;
                    }
                    do {
                        builder = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
                        this.addLastSeqIdsPut(builder, queueId.getPeerId(), lastSeqIds, asyncTable);
                        if (builder.getMutationRequestCount() <= 0) break block5;
                    } while (!(responose = (MultiRowMutationProtos.MutateRowsResponse)FutureUtils.get(asyncTable.coprocessorService(MultiRowMutationProtos.MultiRowMutationService::newStub, (arg_0, arg_1, arg_2) -> TableReplicationQueueStorage.lambda$setOffset$1(request = builder.addMutationRequest(ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put)).build(), arg_0, arg_1, arg_2), put.getRow()))).getProcessed());
                    break block4;
                }
                FutureUtils.get(asyncTable.put(put));
            }
            catch (IOException e) {
                throw new ReplicationException("failed to setOffset, queueId=" + queueId + ", walGroup=" + walGroup + ", offset=" + offset + ", lastSeqIds=" + lastSeqIds, e);
            }
        }
    }

    private ImmutableMap<String, ReplicationGroupOffset> parseOffsets(Result result) {
        ImmutableMap.Builder builder = ImmutableMap.builderWithExpectedSize(result.size());
        NavigableMap<byte[], byte[]> map = result.getFamilyMap(QUEUE_FAMILY);
        if (map != null) {
            map.forEach((k, v) -> {
                String walGroup = Bytes.toString(k);
                ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
                builder.put(walGroup, offset);
            });
        }
        return builder.build();
    }

    private Map<String, ReplicationGroupOffset> getOffsets0(Table table, ReplicationQueueId queueId) throws IOException {
        Result result = table.get(new Get(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY));
        return this.parseOffsets(result);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId) throws ReplicationException {
        try (Table table = this.conn.getTable(this.tableName);){
            Map<String, ReplicationGroupOffset> map = this.getOffsets0(table, queueId);
            return map;
        }
        catch (IOException e) {
            throw new ReplicationException("failed to getOffsets, queueId=" + queueId, e);
        }
    }

    private void listAllQueueIds(Table table, Scan scan, List<ReplicationQueueId> queueIds) throws IOException {
        try (ResultScanner scanner = table.getScanner(scan);){
            Result result;
            while ((result = scanner.next()) != null) {
                ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
                queueIds.add(queueId);
            }
        }
    }

    private void listAllQueueIds(Table table, String peerId, ServerName serverName, List<ReplicationQueueId> queueIds) throws IOException {
        this.listAllQueueIds(table, new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(serverName, peerId)).addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()), queueIds);
    }

    @Override
    public List<ReplicationQueueId> listAllQueueIds(String peerId) throws ReplicationException {
        Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)).addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter());
        ArrayList<ReplicationQueueId> queueIds = new ArrayList<ReplicationQueueId>();
        try (Table table = this.conn.getTable(this.tableName);){
            this.listAllQueueIds(table, scan, queueIds);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e);
        }
        return queueIds;
    }

    @Override
    public List<ReplicationQueueId> listAllQueueIds(ServerName serverName) throws ReplicationException {
        ArrayList<ReplicationQueueId> queueIds = new ArrayList<ReplicationQueueId>();
        try (Table table = this.conn.getTable(this.tableName);){
            KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter();
            String previousPeerId = null;
            while (true) {
                String peerId;
                Scan peerScan = new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(keyOnlyFilter);
                if (previousPeerId != null) {
                    peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
                }
                try (ResultScanner scanner = table.getScanner(peerScan);){
                    Result result = scanner.next();
                    if (result == null) break;
                    peerId = ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
                }
                this.listAllQueueIds(table, peerId, serverName, queueIds);
                previousPeerId = peerId;
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllQueueIds, serverName=" + serverName, e);
        }
        return queueIds;
    }

    @Override
    public List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName) throws ReplicationException {
        ArrayList<ReplicationQueueId> queueIds = new ArrayList<ReplicationQueueId>();
        try (Table table = this.conn.getTable(this.tableName);){
            this.listAllQueueIds(table, peerId, serverName, queueIds);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId + ", serverName=" + serverName, e);
        }
        return queueIds;
    }

    @Override
    public List<ReplicationQueueData> listAllQueues() throws ReplicationException {
        ArrayList<ReplicationQueueData> queues = new ArrayList<ReplicationQueueData>();
        Scan scan = new Scan().addFamily(QUEUE_FAMILY).setReadType(Scan.ReadType.STREAM);
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(scan);){
            Result result;
            while ((result = scanner.next()) != null) {
                ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
                ReplicationQueueData queueData = new ReplicationQueueData(queueId, this.parseOffsets(result));
                queues.add(queueData);
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllQueues", e);
        }
        return queues;
    }

    @Override
    public List<ServerName> listAllReplicators() throws ReplicationException {
        HashSet<ServerName> replicators = new HashSet<ServerName>();
        Scan scan = new Scan().addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter()).setReadType(Scan.ReadType.STREAM);
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(scan);){
            Result result;
            while ((result = scanner.next()) != null) {
                ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
                replicators.add(queueId.getServerName());
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllReplicators", e);
        }
        return new ArrayList<ServerName>(replicators);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId, ServerName targetServerName) throws ReplicationException {
        ReplicationQueueId newQueueId = queueId.claim(targetServerName);
        byte[] coprocessorRow = ReplicationQueueId.getScanPrefix(queueId.getPeerId());
        AsyncTable<AdvancedScanResultConsumer> asyncTable = this.conn.toAsyncConnection().getTable(this.tableName);
        try (Table table = this.conn.getTable(this.tableName);){
            while (true) {
                Map<String, ReplicationGroupOffset> offsets;
                if ((offsets = this.getOffsets0(table, queueId)).isEmpty()) {
                    Map<String, ReplicationGroupOffset> map = Collections.emptyMap();
                    return map;
                }
                Map.Entry<String, ReplicationGroupOffset> entry = offsets.entrySet().iterator().next();
                ClientProtos.Condition condition = ProtobufUtil.toCondition(Bytes.toBytes(queueId.toString()), QUEUE_FAMILY, Bytes.toBytes(entry.getKey()), CompareOperator.EQUAL, Bytes.toBytes(entry.getValue().toString()), null);
                Delete delete = new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY);
                Put put = new Put(Bytes.toBytes(newQueueId.toString()));
                offsets.forEach((walGroup, offset) -> put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())));
                MultiRowMutationProtos.MutateRowsRequest request = MultiRowMutationProtos.MutateRowsRequest.newBuilder().addCondition(condition).addMutationRequest(ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, delete)).addMutationRequest(ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put)).build();
                MultiRowMutationProtos.MutateRowsResponse resp = (MultiRowMutationProtos.MutateRowsResponse)FutureUtils.get(asyncTable.coprocessorService(MultiRowMutationProtos.MultiRowMutationService::newStub, (stub, controller, done) -> stub.mutateRows(controller, request, done), coprocessorRow));
                if (resp.getProcessed()) {
                    Map<String, ReplicationGroupOffset> map = offsets;
                    return map;
                }
                continue;
                break;
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to claimQueue, queueId=" + queueId + ", targetServerName=" + targetServerName, e);
        }
    }

    @Override
    public void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
        try (Table table = this.conn.getTable(this.tableName);){
            table.delete(new Delete(Bytes.toBytes(queueId.toString())).addFamily(QUEUE_FAMILY));
        }
        catch (IOException e) {
            throw new ReplicationException("failed to removeQueue, queueId=" + queueId, e);
        }
    }

    @Override
    public void removeAllQueues(String peerId) throws ReplicationException {
        Scan scan = new Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId)).addFamily(QUEUE_FAMILY).setFilter(new KeyOnlyFilter());
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(scan);){
            Result result;
            while ((result = scanner.next()) != null) {
                table.delete(new Delete(result.getRow()));
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to listAllQueueIds, peerId=" + peerId, e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException {
        byte[] qual = Bytes.toBytes(encodedRegionName);
        try (Table table = this.conn.getTable(this.tableName);){
            Result result = table.get(new Get(Bytes.toBytes(peerId)).addColumn(LAST_SEQUENCE_ID_FAMILY, qual));
            byte[] lastSeqId = result.getValue(LAST_SEQUENCE_ID_FAMILY, qual);
            long l = lastSeqId != null ? Bytes.toLong(lastSeqId) : -1L;
            return l;
        }
        catch (IOException e) {
            throw new ReplicationException("failed to getLastSequenceId, encodedRegionName=" + encodedRegionName + ", peerId=" + peerId, e);
        }
    }

    @Override
    public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException {
        Put put = new Put(Bytes.toBytes(peerId));
        lastSeqIds.forEach((encodedRegionName, lastSeqId) -> put.addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(encodedRegionName), Bytes.toBytes(lastSeqId)));
        try (Table table = this.conn.getTable(this.tableName);){
            table.put(put);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to setLastSequenceIds, peerId=" + peerId + ", lastSeqIds=" + lastSeqIds, e);
        }
    }

    @Override
    public void removeLastSequenceIds(String peerId) throws ReplicationException {
        Delete delete = new Delete(Bytes.toBytes(peerId)).addFamily(LAST_SEQUENCE_ID_FAMILY);
        try (Table table = this.conn.getTable(this.tableName);){
            table.delete(delete);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId, e);
        }
    }

    @Override
    public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames) throws ReplicationException {
        Delete delete = new Delete(Bytes.toBytes(peerId));
        encodedRegionNames.forEach(n -> delete.addColumns(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(n)));
        try (Table table = this.conn.getTable(this.tableName);){
            table.delete(delete);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to removeLastSequenceIds, peerId=" + peerId + ", encodedRegionNames=" + encodedRegionNames, e);
        }
    }

    @Override
    public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
        try (Table table = this.conn.getTable(this.tableName);){
            table.delete(new Delete(Bytes.toBytes(peerId)).addFamily(HFILE_REF_FAMILY));
        }
        catch (IOException e) {
            throw new ReplicationException("failed to removePeerFromHFileRefs, peerId=" + peerId, e);
        }
    }

    @Override
    public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException {
        Put put = new Put(Bytes.toBytes(peerId));
        pairs.forEach(p -> put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(((Path)p.getSecond()).getName()), HConstants.EMPTY_BYTE_ARRAY));
        try (Table table = this.conn.getTable(this.tableName);){
            table.put(put);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to addHFileRefs, peerId=" + peerId + ", pairs=" + pairs, e);
        }
    }

    @Override
    public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
        Delete delete = new Delete(Bytes.toBytes(peerId));
        files.forEach(f -> delete.addColumns(HFILE_REF_FAMILY, Bytes.toBytes(f)));
        try (Table table = this.conn.getTable(this.tableName);){
            table.delete(delete);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to removeHFileRefs, peerId=" + peerId + ", files=" + files, e);
        }
    }

    @Override
    public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
        ArrayList<String> peerIds = new ArrayList<String>();
        Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(Scan.ReadType.STREAM).setFilter(new KeyOnlyFilter());
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(scan);){
            Result result;
            while ((result = scanner.next()) != null) {
                peerIds.add(Bytes.toString(result.getRow()));
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to getAllPeersFromHFileRefsQueue", e);
        }
        return peerIds;
    }

    private <T extends Collection<String>> T scanHFiles(Scan scan, Supplier<T> creator) throws IOException {
        Collection files = (Collection)creator.get();
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(scan);){
            Result result;
            block18: while ((result = scanner.next()) != null) {
                CellScanner cellScanner = result.cellScanner();
                while (true) {
                    if (!cellScanner.advance()) continue block18;
                    Cell cell = cellScanner.current();
                    files.add(Bytes.toString(CellUtil.cloneQualifier(cell)));
                }
                break;
            }
        }
        return (T)files;
    }

    @Override
    public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
        Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setStartStopRowForPrefixScan(Bytes.toBytes(peerId)).setAllowPartialResults(true);
        try {
            return this.scanHFiles(scan, ArrayList::new);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to getReplicableHFiles, peerId=" + peerId, e);
        }
    }

    @Override
    public Set<String> getAllHFileRefs() throws ReplicationException {
        Scan scan = new Scan().addFamily(HFILE_REF_FAMILY).setReadType(Scan.ReadType.STREAM).setAllowPartialResults(true);
        try {
            return this.scanHFiles(scan, HashSet::new);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to getAllHFileRefs", e);
        }
    }

    @Override
    public boolean hasData() throws ReplicationException {
        try {
            return this.conn.getAdmin().tableExists(this.tableName);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to get replication queue table", e);
        }
    }

    @Override
    public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas) throws ReplicationException {
        ArrayList<Put> puts = new ArrayList<Put>();
        for (ReplicationQueueData data : datas) {
            if (data.getOffsets().isEmpty()) continue;
            Put put = new Put(Bytes.toBytes(data.getId().toString()));
            data.getOffsets().forEach((walGroup, offset) -> put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())));
            puts.add(put);
        }
        try (Table table = this.conn.getTable(this.tableName);){
            table.put(puts);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to batch update queues", e);
        }
    }

    @Override
    public void batchUpdateLastSequenceIds(List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds) throws ReplicationException {
        HashMap<String, Put> peerId2Put = new HashMap<String, Put>();
        for (ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
            peerId2Put.computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId))).addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()), Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
        }
        try (Table table = this.conn.getTable(this.tableName);){
            table.put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
        }
        catch (IOException e) {
            throw new ReplicationException("failed to batch update last pushed sequence ids", e);
        }
    }

    @Override
    public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException {
        if (hfileRefs.isEmpty()) {
            return;
        }
        Put put = new Put(Bytes.toBytes(peerId));
        for (String ref : hfileRefs) {
            put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
        }
        try (Table table = this.conn.getTable(this.tableName);){
            table.put(put);
        }
        catch (IOException e) {
            throw new ReplicationException("failed to batch update hfile references", e);
        }
    }

    @Override
    public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
        try (Table table = this.conn.getTable(this.tableName);
             ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY).addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()));){
            Result r;
            while ((r = scanner.next()) != null) {
                Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts).addFamily(HFILE_REF_FAMILY, ts);
                table.delete(delete);
            }
        }
        catch (IOException e) {
            throw new ReplicationException("failed to remove last sequence ids and hfile references before timestamp " + ts, e);
        }
    }

    private static /* synthetic */ void lambda$setOffset$1(MultiRowMutationProtos.MutateRowsRequest request, MultiRowMutationProtos.MultiRowMutationService.Interface stub, RpcController controller, RpcCallback done) {
        stub.mutateRows(controller, request, done);
    }
}

