/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationTableBase;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class TableBasedReplicationQueuesImpl
extends ReplicationTableBase
implements ReplicationQueues {
    private static final Log LOG = LogFactory.getLog(TableBasedReplicationQueuesImpl.class);
    private static final byte[] INITIAL_OFFSET_BYTES = Bytes.toBytes(0L);
    private static final byte[] EMPTY_STRING_BYTES = Bytes.toBytes("");
    private String serverName = null;
    private byte[] serverNameBytes = null;
    private ReplicationStateZKBase replicationState;

    public TableBasedReplicationQueuesImpl(ReplicationQueuesArguments args) throws IOException {
        this(args.getConf(), args.getAbortable(), args.getZk());
    }

    public TableBasedReplicationQueuesImpl(Configuration conf, Abortable abort, ZooKeeperWatcher zkw) throws IOException {
        super(conf, abort);
        this.replicationState = new ReplicationStateZKBase(zkw, conf, abort){};
    }

    @Override
    public void init(String serverName) throws ReplicationException {
        this.serverName = serverName;
        this.serverNameBytes = Bytes.toBytes(serverName);
    }

    @Override
    public List<String> getListOfReplicators() {
        return super.getListOfReplicators();
    }

    @Override
    public void removeQueue(String queueId) {
        try {
            byte[] rowKey = this.queueIdToRowKey(queueId);
            if (this.checkQueueExists(queueId)) {
                Delete deleteQueue = new Delete(rowKey);
                this.safeQueueUpdate(deleteQueue);
            } else {
                LOG.info((Object)("No logs were registered for queue id=" + queueId + " so no rows were removed from the replication table while removing the queue"));
            }
        }
        catch (IOException | ReplicationException e) {
            String errMsg = "Failed removing queue queueId=" + queueId;
            this.abortable.abort(errMsg, e);
        }
    }

    @Override
    public void addLog(String queueId, String filename) throws ReplicationException {
        try (Table replicationTable = this.getOrBlockOnReplicationTable();){
            if (!this.checkQueueExists(queueId)) {
                Put putNewQueue = new Put(Bytes.toBytes(this.buildQueueRowKey(queueId)));
                putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER, this.serverNameBytes);
                putNewQueue.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, EMPTY_STRING_BYTES);
                putNewQueue.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
                replicationTable.put(putNewQueue);
            } else {
                Put putNewLog = new Put(this.queueIdToRowKey(queueId));
                putNewLog.addColumn(CF_QUEUE, Bytes.toBytes(filename), INITIAL_OFFSET_BYTES);
                this.safeQueueUpdate(putNewLog);
            }
        }
        catch (IOException | ReplicationException e) {
            String errMsg = "Failed adding log queueId=" + queueId + " filename=" + filename;
            this.abortable.abort(errMsg, e);
        }
    }

    @Override
    public void removeLog(String queueId, String filename) {
        try {
            byte[] rowKey = this.queueIdToRowKey(queueId);
            Delete delete = new Delete(rowKey);
            delete.addColumns(CF_QUEUE, Bytes.toBytes(filename));
            this.safeQueueUpdate(delete);
        }
        catch (IOException | ReplicationException e) {
            String errMsg = "Failed removing log queueId=" + queueId + " filename=" + filename;
            this.abortable.abort(errMsg, e);
        }
    }

    @Override
    public void setLogPosition(String queueId, String filename, long position) {
        try (Table replicationTable = this.getOrBlockOnReplicationTable();){
            byte[] rowKey = this.queueIdToRowKey(queueId);
            Get checkLogExists = new Get(rowKey);
            checkLogExists.addColumn(CF_QUEUE, Bytes.toBytes(filename));
            if (!replicationTable.exists(checkLogExists)) {
                String errMsg = "Could not set position of non-existent log from queueId=" + queueId + ", filename=" + filename;
                this.abortable.abort(errMsg, new ReplicationException(errMsg));
                return;
            }
            Put walAndOffset = new Put(rowKey);
            walAndOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename), Bytes.toBytes(position));
            this.safeQueueUpdate(walAndOffset);
        }
        catch (IOException | ReplicationException e) {
            String errMsg = "Failed writing log position queueId=" + queueId + "filename=" + filename + " position=" + position;
            this.abortable.abort(errMsg, e);
        }
    }

    @Override
    public long getLogPosition(String queueId, String filename) throws ReplicationException {
        try {
            byte[] rowKey = this.queueIdToRowKey(queueId);
            Get getOffset = new Get(rowKey);
            getOffset.addColumn(CF_QUEUE, Bytes.toBytes(filename));
            Result result = this.getResultIfOwner(getOffset);
            if (result == null || !result.containsColumn(CF_QUEUE, Bytes.toBytes(filename))) {
                throw new ReplicationException("Could not read empty result while getting log position queueId=" + queueId + ", filename=" + filename);
            }
            return Bytes.toLong(result.getValue(CF_QUEUE, Bytes.toBytes(filename)));
        }
        catch (IOException e) {
            throw new ReplicationException("Could not get position in log for queueId=" + queueId + ", filename=" + filename);
        }
    }

    @Override
    public void removeAllQueues() {
        List<String> myQueueIds = this.getAllQueues();
        for (String queueId : myQueueIds) {
            this.removeQueue(queueId);
        }
    }

    @Override
    public List<String> getLogsInQueue(String queueId) {
        String errMsg = "Failed getting logs in queue queueId=" + queueId;
        byte[] rowKey = this.queueIdToRowKey(queueId);
        ArrayList<String> logs = new ArrayList<String>();
        try {
            Get getQueue = new Get(rowKey);
            Result queue = this.getResultIfOwner(getQueue);
            if (queue == null || queue.isEmpty()) {
                String errMsgLostOwnership = "Failed getting logs for queue queueId=" + Bytes.toString(rowKey) + " because the queue was missing or we lost ownership";
                this.abortable.abort(errMsg, new ReplicationException(errMsgLostOwnership));
                return null;
            }
            NavigableMap<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
            for (byte[] cQualifier : familyMap.keySet()) {
                if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier, COL_QUEUE_OWNER_HISTORY)) continue;
                logs.add(Bytes.toString(cQualifier));
            }
        }
        catch (IOException e) {
            this.abortable.abort(errMsg, e);
            return null;
        }
        return logs;
    }

    @Override
    public List<String> getAllQueues() {
        return this.getAllQueues(this.serverName);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public List<String> getUnClaimedQueueIds(String regionserver) {
        if (this.isThisOurRegionServer(regionserver)) {
            return null;
        }
        try (ResultScanner queuesToClaim = this.getQueuesBelongingToServer(regionserver);){
            ArrayList<String> res = new ArrayList<String>();
            for (Result queue : queuesToClaim) {
                String rowKey = Bytes.toString(queue.getRow());
                res.add(rowKey);
            }
            ArrayList<String> arrayList = res.isEmpty() ? null : res;
            return arrayList;
        }
        catch (IOException e) {
            String errMsg = "Failed getUnClaimedQueueIds";
            this.abortable.abort(errMsg, e);
            return null;
        }
    }

    @Override
    public void removeReplicatorIfQueueIsEmpty(String regionserver) {
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
        if (this.isThisOurRegionServer(regionserver)) {
            return null;
        }
        try (ResultScanner queuesToClaim = this.getQueuesBelongingToServer(regionserver);){
            Iterator<Result> iterator = queuesToClaim.iterator();
            while (iterator.hasNext()) {
                Result queue = iterator.next();
                String rowKey = Bytes.toString(queue.getRow());
                if (!rowKey.equals(queueId) || !this.attemptToClaimQueue(queue, regionserver)) continue;
                ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(rowKey);
                if (this.replicationState.peerExists(replicationQueueInfo.getPeerId())) {
                    TreeSet<String> sortedLogs = new TreeSet<String>();
                    List<String> logs = this.getLogsInQueue(queue.getRow());
                    Object object = logs.iterator();
                    while (true) {
                        if (!object.hasNext()) {
                            LOG.info((Object)(this.serverName + " has claimed queue " + rowKey + " from " + regionserver));
                            object = new Pair(rowKey, sortedLogs);
                            return object;
                        }
                        String log = object.next();
                        sortedLogs.add(log);
                    }
                }
                this.removeQueue(Bytes.toString(queue.getRow()));
                LOG.info((Object)(this.serverName + " has deleted abandoned queue " + queueId + " from " + regionserver));
            }
            return null;
        }
        catch (IOException | KeeperException e) {
            String errMsg = "Failed claiming queues for regionserver=" + regionserver;
            this.abortable.abort(errMsg, e);
        }
        return null;
    }

    @Override
    public boolean isThisOurRegionServer(String regionserver) {
        return this.serverName.equals(regionserver);
    }

    @Override
    public void addPeerToHFileRefs(String peerId) throws ReplicationException {
        throw new NotImplementedException("Not implemented");
    }

    @Override
    public void removePeerFromHFileRefs(String peerId) {
        throw new NotImplementedException("Not implemented");
    }

    @Override
    public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException {
        throw new NotImplementedException("Not implemented");
    }

    @Override
    public void removeHFileRefs(String peerId, List<String> files) {
        throw new NotImplementedException("Not implemented");
    }

    private String buildQueueRowKey(String queueId) {
        return this.buildQueueRowKey(this.serverName, queueId);
    }

    private byte[] queueIdToRowKey(String queueId) {
        return this.queueIdToRowKey(this.serverName, queueId);
    }

    private void safeQueueUpdate(Put put) throws ReplicationException, IOException {
        RowMutations mutations = new RowMutations(put.getRow());
        mutations.add(put);
        this.safeQueueUpdate(mutations);
    }

    private void safeQueueUpdate(Delete delete) throws ReplicationException, IOException {
        RowMutations mutations = new RowMutations(delete.getRow());
        mutations.add(delete);
        this.safeQueueUpdate(mutations);
    }

    private void safeQueueUpdate(RowMutations mutate) throws ReplicationException, IOException {
        try (Table replicationTable = this.getOrBlockOnReplicationTable();){
            boolean updateSuccess = replicationTable.checkAndMutate(mutate.getRow(), CF_QUEUE, COL_QUEUE_OWNER, CompareOperator.EQUAL, this.serverNameBytes, mutate);
            if (!updateSuccess) {
                throw new ReplicationException("Failed to update Replication Table because we lost queue  ownership");
            }
        }
    }

    private boolean checkQueueExists(String queueId) throws IOException {
        try (Table replicationTable = this.getOrBlockOnReplicationTable();){
            byte[] rowKey = this.queueIdToRowKey(queueId);
            boolean bl = replicationTable.exists(new Get(rowKey));
            return bl;
        }
    }

    private boolean attemptToClaimQueue(Result queue, String originalServer) throws IOException {
        Put putQueueNameAndHistory = new Put(queue.getRow());
        putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER, Bytes.toBytes(this.serverName));
        String newOwnerHistory = this.buildClaimedQueueHistory(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)), originalServer);
        putQueueNameAndHistory.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY, Bytes.toBytes(newOwnerHistory));
        RowMutations claimAndRenameQueue = new RowMutations(queue.getRow());
        claimAndRenameQueue.add(putQueueNameAndHistory);
        try (Table replicationTable = this.getOrBlockOnReplicationTable();){
            boolean success;
            boolean bl = success = replicationTable.checkAndMutate(queue.getRow(), CF_QUEUE, COL_QUEUE_OWNER, CompareOperator.EQUAL, Bytes.toBytes(originalServer), claimAndRenameQueue);
            return bl;
        }
    }

    /*
     * Loose catch block
     */
    private Result getResultIfOwner(Get get) throws IOException {
        Scan scan = new Scan(get);
        if (scan.getFamilyMap().size() > 0) {
            scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
        }
        scan.setMaxResultSize(1L);
        SingleColumnValueFilter checkOwner = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER, CompareOperator.EQUAL, this.serverNameBytes);
        scan.setFilter(checkOwner);
        try (ResultScanner scanner = null;){
            try (Table replicationTable = this.getOrBlockOnReplicationTable();){
                scanner = replicationTable.getScanner(scan);
                Result result = scanner.next();
                Result result2 = result == null || result.isEmpty() ? null : result;
                return result2;
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
    }
}

