/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.block;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.block.DatanodeDeletedBlockTransactions;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeletedBlockLogImpl
implements DeletedBlockLog,
EventHandler<CommandStatusReportHandler.DeleteBlockStatus> {
    public static final Logger LOG = LoggerFactory.getLogger(DeletedBlockLogImpl.class);
    private static final StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.Builder DUMMY_TXN_BUILDER = StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.newBuilder().setContainerID(1L).setCount(1);
    private final int maxRetry;
    private final ContainerManager containerManager;
    private final SCMMetadataStore scmMetadataStore;
    private final Lock lock;
    private Map<Long, Set<UUID>> transactionToDNsCommitMap;
    private Map<Long, Integer> transactionRetryCountMap;
    private final AtomicLong largestTxnId;
    private final long largestTxnIdHolderKey = 0L;

    public DeletedBlockLogImpl(ConfigurationSource conf, ContainerManager containerManager, SCMMetadataStore scmMetadataStore) throws IOException {
        this.maxRetry = conf.getInt("ozone.scm.block.deletion.max.retry", 4096);
        this.containerManager = containerManager;
        this.scmMetadataStore = scmMetadataStore;
        this.lock = new ReentrantLock();
        this.transactionToDNsCommitMap = new ConcurrentHashMap<Long, Set<UUID>>();
        this.transactionRetryCountMap = new ConcurrentHashMap<Long, Integer>();
        this.largestTxnId = new AtomicLong(this.getLargestRecordedTXID());
    }

    public Long getNextDeleteBlockTXID() {
        return this.largestTxnId.incrementAndGet();
    }

    public Long getCurrentTXID() {
        return this.largestTxnId.get();
    }

    private long getLargestRecordedTXID() throws IOException {
        long txnId;
        StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)this.scmMetadataStore.getDeletedBlocksTXTable().get((Object)0L);
        long l = txnId = txn != null ? txn.getTxID() : 0L;
        if (txn == null) {
            try (TableIterator<Long, ? extends Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> txIter = this.getIterator();){
                txIter.seekToLast();
                long l2 = txnId = txIter.key() != null ? (Long)txIter.key() : 0L;
                if (txnId > 0L) {
                    this.scmMetadataStore.getDeletedBlocksTXTable().put((Object)0L, (Object)DUMMY_TXN_BUILDER.setTxID(txnId).build());
                }
            }
        }
        return txnId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> getFailedTransactions() throws IOException {
        this.lock.lock();
        try {
            ArrayList failedTXs = Lists.newArrayList();
            try (TableIterator<Long, ? extends Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.getIterator();){
                while (iter.hasNext()) {
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction delTX = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)((Table.KeyValue)iter.next()).getValue();
                    if (delTX.getCount() != -1) continue;
                    failedTXs.add(delTX);
                }
            }
            ArrayList arrayList = failedTXs;
            return arrayList;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void incrementCount(List<Long> txIDs) throws IOException {
        for (Long txID : txIDs) {
            this.lock.lock();
            try {
                StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction block = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)this.scmMetadataStore.getDeletedBlocksTXTable().get((Object)txID);
                if (block == null) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Deleted TXID {} not found.", (Object)txID);
                    continue;
                }
                int currentCount = this.transactionRetryCountMap.getOrDefault(txID, block.getCount());
                if (currentCount <= -1) continue;
                int nextCount = currentCount + 1;
                StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.Builder builder = block.toBuilder();
                if (nextCount > this.maxRetry) {
                    builder.setCount(-1);
                    this.scmMetadataStore.getDeletedBlocksTXTable().put((Object)txID, (Object)builder.build());
                    this.transactionRetryCountMap.remove(txID);
                    continue;
                }
                if (nextCount % 100 == 0) {
                    builder.setCount(nextCount);
                    this.scmMetadataStore.getDeletedBlocksTXTable().put((Object)txID, (Object)builder.build());
                    this.transactionRetryCountMap.put(txID, nextCount);
                    continue;
                }
                this.transactionRetryCountMap.put(txID, nextCount);
            }
            catch (IOException ex) {
                LOG.warn("Cannot increase count for txID " + txID, (Throwable)ex);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction constructNewTransaction(long txID, long containerID, List<Long> blocks) {
        return StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.newBuilder().setTxID(txID).setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitTransactions(List<StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult> transactionResults, UUID dnID) {
        this.lock.lock();
        try {
            for (StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult transactionResult : transactionResults) {
                if (this.isTransactionFailed(transactionResult)) continue;
                try {
                    List containerDns;
                    long txID = transactionResult.getTxID();
                    Set<UUID> dnsWithCommittedTxn = this.transactionToDNsCommitMap.get(txID);
                    ContainerID containerId = ContainerID.valueof((long)transactionResult.getContainerID());
                    if (dnsWithCommittedTxn == null) {
                        if (!LOG.isDebugEnabled()) continue;
                        LOG.debug("Transaction txId={} commit by dnId={} for containerID={} failed. Corresponding entry not found.", new Object[]{txID, dnID, containerId});
                        continue;
                    }
                    dnsWithCommittedTxn.add(dnID);
                    ContainerInfo container = this.containerManager.getContainer(containerId);
                    Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(containerId);
                    if (Math.min(replicas.size(), dnsWithCommittedTxn.size()) >= container.getReplicationFactor().getNumber() && dnsWithCommittedTxn.containsAll(containerDns = replicas.stream().map(ContainerReplica::getDatanodeDetails).map(DatanodeDetails::getUuid).collect(Collectors.toList()))) {
                        this.transactionToDNsCommitMap.remove(txID);
                        this.transactionRetryCountMap.remove(txID);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Purging txId={} from block deletion log", (Object)txID);
                        }
                        this.scmMetadataStore.getDeletedBlocksTXTable().delete((Object)txID);
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Datanode txId={} containerId={} committed by dnId={}", new Object[]{txID, containerId, dnID});
                }
                catch (IOException e) {
                    LOG.warn("Could not commit delete block transaction: " + transactionResult.getTxID(), (Throwable)e);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private boolean isTransactionFailed(StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult result) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got block deletion ACK from datanode, TXIDs={}, success={}", (Object)result.getTxID(), (Object)result.getSuccess());
        }
        if (!result.getSuccess()) {
            LOG.warn("Got failed ACK for TXID={}, prepare to resend the TX in next interval", (Object)result.getTxID());
            return true;
        }
        return false;
    }

    @Override
    public void addTransaction(long containerID, List<Long> blocks) throws IOException {
        Map<Long, List<Long>> map = Collections.singletonMap(containerID, blocks);
        this.addTransactions(map);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumOfValidTransactions() throws IOException {
        this.lock.lock();
        try {
            AtomicInteger num = new AtomicInteger(0);
            try (TableIterator<Long, ? extends Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.getIterator();){
                while (iter.hasNext()) {
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction delTX = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)((Table.KeyValue)iter.next()).getValue();
                    if (delTX.getCount() <= -1) continue;
                    num.incrementAndGet();
                }
            }
            int n = num.get();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addTransactions(Map<Long, List<Long>> containerBlocksMap) throws IOException {
        this.lock.lock();
        try (BatchOperation batch = this.scmMetadataStore.getStore().initBatchOperation();){
            for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
                long nextTXID = this.getNextDeleteBlockTXID();
                this.scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch, (Object)nextTXID, (Object)this.constructNewTransaction(nextTXID, entry.getKey(), entry.getValue()));
            }
            this.scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch, (Object)0L, (Object)DUMMY_TXN_BUILDER.setTxID(this.getCurrentTXID().longValue()).build());
            this.scmMetadataStore.getStore().commitBatchOperation(batch);
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void close() throws IOException {
    }

    private void getTransaction(StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction tx, DatanodeDeletedBlockTransactions transactions) {
        try {
            Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(ContainerID.valueof((long)tx.getContainerID()));
            for (ContainerReplica replica : replicas) {
                UUID dnID = replica.getDatanodeDetails().getUuid();
                Set<UUID> dnsWithTransactionCommitted = this.transactionToDNsCommitMap.get(tx.getTxID());
                if (dnsWithTransactionCommitted != null && dnsWithTransactionCommitted.contains(dnID)) continue;
                transactions.addTransactionToDN(dnID, tx);
            }
        }
        catch (IOException e) {
            LOG.warn("Got container info error.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit) throws IOException {
        this.lock.lock();
        try {
            DatanodeDeletedBlockTransactions transactions = new DatanodeDeletedBlockTransactions();
            try (TableIterator<Long, ? extends Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> iter = this.getIterator();){
                int numBlocksAdded = 0;
                ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> txnsToBePurged = new ArrayList<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>();
                while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
                    Table.KeyValue keyValue = (Table.KeyValue)iter.next();
                    StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction txn = (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction)keyValue.getValue();
                    ContainerID id = ContainerID.valueof((long)txn.getContainerID());
                    try {
                        if (txn.getCount() <= -1 || txn.getCount() > this.maxRetry || this.containerManager.getContainer(id).isOpen()) continue;
                        numBlocksAdded += txn.getLocalIDCount();
                        this.getTransaction(txn, transactions);
                        this.transactionToDNsCommitMap.putIfAbsent(txn.getTxID(), new LinkedHashSet());
                    }
                    catch (ContainerNotFoundException ex) {
                        LOG.warn("Container: " + id + " was not found for the transaction: " + txn);
                        txnsToBePurged.add(txn);
                    }
                }
                this.purgeTransactions(txnsToBePurged);
                for (StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction trx : txnsToBePurged) {
                    this.transactionRetryCountMap.remove(trx.getTxID());
                    this.transactionToDNsCommitMap.remove(trx.getTxID());
                }
            }
            DatanodeDeletedBlockTransactions datanodeDeletedBlockTransactions = transactions;
            return datanodeDeletedBlockTransactions;
        }
        finally {
            this.lock.unlock();
        }
    }

    public void purgeTransactions(List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction> txnsToBePurged) throws IOException {
        try (BatchOperation batch = this.scmMetadataStore.getBatchHandler().initBatchOperation();){
            for (int i = 0; i < txnsToBePurged.size(); ++i) {
                this.scmMetadataStore.getDeletedBlocksTXTable().deleteWithBatch(batch, (Object)txnsToBePurged.get(i).getTxID());
            }
            this.scmMetadataStore.getBatchHandler().commitBatchOperation(batch);
        }
    }

    TableIterator<Long, ? extends Table.KeyValue<Long, StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>> getIterator() throws IOException {
        TableIterator iter = this.scmMetadataStore.getDeletedBlocksTXTable().iterator();
        iter.seek((Object)1L);
        return iter;
    }

    public void onMessage(CommandStatusReportHandler.DeleteBlockStatus deleteBlockStatus, EventPublisher publisher) {
        StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto ackProto = deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
        this.commitTransactions(ackProto.getResultsList(), UUID.fromString(ackProto.getDnId()));
    }
}

